diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 4e507436b942..6a2bffd34d6e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -66,6 +66,10 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT return init(getDefaultHadoopConf(), basePath, tableType); } + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, Properties properties) throws IOException { + return init(getDefaultHadoopConf(), basePath, tableType, properties); + } + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException { Properties props = new Properties(); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 9ed0dfb807eb..3d96c1cafad1 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -263,7 +263,7 @@ private void testWriteToHoodie( client.getJobExecutionResult().get(); } - TestData.checkWrittenFullData(tempFile, expected); + TestData.checkWrittenDataCOW(tempFile, expected); } private void testWriteToHoodieWithCluster( @@ -327,7 +327,7 @@ private void testWriteToHoodieWithCluster( // wait for the streaming job to finish client.getJobExecutionResult().get(); - TestData.checkWrittenFullData(tempFile, expected); + TestData.checkWrittenDataCOW(tempFile, expected); } public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception { @@ -449,7 +449,7 @@ public void testHoodiePipelineBuilderSink() throws Exception { builder.sink(dataStream, false); execute(execEnv, true, "Api_Sink_Test"); - TestData.checkWrittenFullData(tempFile, EXPECTED); + TestData.checkWrittenDataCOW(tempFile, EXPECTED); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index e67d2ab35c76..9a6aaf01e669 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -250,7 +250,7 @@ public void testInsertAppendMode() throws Exception { .checkpoint(2) .assertNextEvent() .checkpointComplete(2) - .checkWrittenFullData(EXPECTED5) + .checkWrittenDataCOW(EXPECTED5) .end(); } @@ -282,7 +282,7 @@ public void testInsertClustering() throws Exception { .checkpoint(2) .handleEvents(2) .checkpointComplete(2) - .checkWrittenFullData(EXPECTED5) + .checkWrittenDataCOW(EXPECTED5) .end(); } @@ -305,7 +305,7 @@ public void testInsertAsyncClustering() throws Exception { .checkpoint(2) .handleEvents(1) .checkpointComplete(2) - .checkWrittenFullData(EXPECTED5) + .checkWrittenDataCOW(EXPECTED5) .end(); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index aa31d859bbc4..1fe48e6700b0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -18,12 +18,15 @@ package org.apache.hudi.sink; +import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.HashMap; import java.util.Map; @@ -84,6 +87,60 @@ public void testIndexStateBootstrapWithCompactionScheduled() throws Exception { validateIndexLoaded(); } + @Test + public void testEventTimeAvroPayloadMergeRead() throws Exception { + conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath()); + conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + conf.set(FlinkOptions.OPERATION, "upsert"); + conf.set(FlinkOptions.CHANGELOG_ENABLED, false); + conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2); + conf.set(FlinkOptions.PRE_COMBINE, true); + conf.set(FlinkOptions.PRECOMBINE_FIELD, "ts"); + conf.set(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName()); + HashMap mergedExpected = new HashMap<>(EXPECTED1); + mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1, id2,par1,id2,Stephen,33,2,par1]"); + TestHarness.instance().preparePipeline(tempFile, conf) + .consume(TestData.DATA_SET_INSERT) + .emptyEventBuffer() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED1, 4) + .consume(TestData.DATA_SET_DISORDER_INSERT) + .emptyEventBuffer() + .checkpoint(2) + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenData(mergedExpected, 4) + .consume(TestData.DATA_SET_SINGLE_INSERT) + .emptyEventBuffer() + .checkpoint(3) + .assertNextEvent() + .checkpointComplete(3) + .checkWrittenData(mergedExpected, 4) + .end(); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2}) + public void testOnlyBaseFileOrOnlyLogFileRead(int compactionDeltaCommits) throws Exception { + conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath()); + conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + conf.set(FlinkOptions.OPERATION, "upsert"); + conf.set(FlinkOptions.CHANGELOG_ENABLED, false); + conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits); + TestHarness.instance().preparePipeline(tempFile, conf) + .consume(TestData.DATA_SET_INSERT) + .emptyEventBuffer() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED1, 4) + .end(); + } + @Override public void testInsertClustering() { // insert clustering is only valid for cow table. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 43e4ed511452..37527a725d17 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -175,7 +175,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { env.execute("flink_hudi_compaction"); writeClient.close(); - TestData.checkWrittenFullData(tempFile, EXPECTED1); + TestData.checkWrittenDataCOW(tempFile, EXPECTED1); } @ParameterizedTest @@ -216,7 +216,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce asyncCompactionService.shutDown(); - TestData.checkWrittenFullData(tempFile, EXPECTED2); + TestData.checkWrittenDataCOW(tempFile, EXPECTED2); } @ParameterizedTest @@ -306,7 +306,7 @@ public void processElement(CompactionCommitEvent event, ProcessFunction writeClient) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index b75225821923..b6ae0767d68a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -22,20 +22,14 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestUtils; -import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.table.data.RowData; @@ -338,9 +332,7 @@ public TestHarness checkWrittenData(Map expected) throws Excepti public TestHarness checkWrittenData( Map expected, int partitions) throws Exception { - if (OptionsResolver.isCowTable(conf) - || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED) - || OptionsResolver.isAppendMode(conf)) { + if (OptionsResolver.isCowTable(conf)) { TestData.checkWrittenData(this.baseFile, expected, partitions); } else { checkWrittenDataMor(baseFile, expected, partitions); @@ -349,15 +341,12 @@ public TestHarness checkWrittenData( } private void checkWrittenDataMor(File baseFile, Map expected, int partitions) throws Exception { - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, HadoopConfigurations.getHadoopConf(conf)); - Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - String latestInstant = lastCompleteInstant(); FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration()); - TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); + TestData.checkWrittenDataMOR(fs, baseFile, expected, partitions); } - public TestHarness checkWrittenFullData(Map> expected) throws IOException { - TestData.checkWrittenFullData(this.baseFile, expected); + public TestHarness checkWrittenDataCOW(Map> expected) throws IOException { + TestData.checkWrittenDataCOW(this.baseFile, expected); return this; } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index d03e1b2eb527..d0cf143318e9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -18,13 +18,22 @@ package org.apache.hudi.utils; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; @@ -49,7 +58,6 @@ import org.apache.flink.types.RowKind; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.Strings; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; @@ -60,15 +68,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.function.Predicate; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.function.Predicate; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; +import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_PROPERTIES_FILE; +import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; +import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -278,6 +290,17 @@ public class TestData { insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); + public static List DATA_SET_DISORDER_INSERT = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(4), StringData.fromString("par1")), + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")) + ); + public static List DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList( // DISORDER UPDATE updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, @@ -598,14 +621,14 @@ public static void checkWrittenAllData( * @param basePath The file base to check, should be a directory * @param expected The expected results mapping, the key should be the partition path */ - public static void checkWrittenFullData( + public static void checkWrittenDataCOW( File basePath, Map> expected) throws IOException { // 1. init flink table HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build(); - HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); + HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); // 2. check each partition data expected.forEach((partition, partitionDataSet) -> { @@ -638,49 +661,90 @@ public static void checkWrittenFullData( * *

Note: Replace it with the Flink reader when it is supported. * - * @param fs The file system - * @param latestInstant The latest committed instant of current table - * @param baseFile The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path - * @param partitions The expected partition number - * @param schema The read schema + * @param fs The file system + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number */ public static void checkWrittenDataMOR( FileSystem fs, - String latestInstant, File baseFile, Map expected, - int partitions, - Schema schema) { + int partitions) throws Exception { assert baseFile.isDirectory() : "Base path should be a directory"; - FileFilter partitionFilter = file -> !file.getName().startsWith("."); - File[] partitionDirs = baseFile.listFiles(partitionFilter); + String basePath = baseFile.getAbsolutePath(); + File hoodiePropertiesFile = new File(baseFile + "/" + METAFOLDER_NAME + "/" + HOODIE_PROPERTIES_FILE); + assert hoodiePropertiesFile.exists(); + // 1. init flink table + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .fromFile(hoodiePropertiesFile) + .withPath(basePath).build(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, config.getProps()); + HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); + Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + + String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants() + .lastInstant().map(HoodieInstant::getTimestamp).orElse(null); + assertNotNull(latestInstant, "No completed commit under table path" + basePath); + + File[] partitionDirs = baseFile.listFiles(file -> !file.getName().startsWith(".") && file.isDirectory()); assertNotNull(partitionDirs); - assertThat(partitionDirs.length, is(partitions)); + assertThat("The partitions number should be: " + partitions, partitionDirs.length, is(partitions)); + + // 2. check each partition data + final int[] requiredPos = IntStream.range(0, schema.getFields().size()).toArray(); for (File partitionDir : partitionDirs) { - File[] dataFiles = partitionDir.listFiles(file -> - file.getName().contains(".log.") && !file.getName().startsWith("..")); - assertNotNull(dataFiles); - List logPaths = Arrays.stream(dataFiles) - .sorted((f1, f2) -> HoodieLogFile.getLogFileComparator() - .compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath()))) - .map(File::getAbsolutePath).collect(Collectors.toList()); - HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant); - List readBuffer = scanner.getRecords().values().stream() - .map(hoodieRecord -> { - try { - // in case it is a delete - GenericRecord record = (GenericRecord) hoodieRecord.getData() - .getInsertValue(schema, new Properties()) - .orElse(null); - return record == null ? (String) null : filterOutVariables(record); - } catch (IOException e) { - throw new RuntimeException(e); + List readBuffer = new ArrayList<>(); + List fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList()); + for (FileSlice fileSlice : fileSlices) { + HoodieMergedLogRecordScanner scanner = null; + List logPaths = fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList()); + if (logPaths.size() > 0) { + scanner = getScanner(fs, basePath, logPaths, schema, latestInstant); + } + String baseFilePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + Set keyToSkip = new HashSet<>(); + if (baseFilePath != null) { + // read the base file first + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + ParquetReader reader = AvroParquetReader.builder(new Path(baseFilePath)).build(); + GenericRecord currentRecord = reader.read(); + while (currentRecord != null) { + String curKey = currentRecord.get(HOODIE_RECORD_KEY_COL_POS).toString(); + if (scanner != null && scanner.getRecords().containsKey(curKey)) { + keyToSkip.add(curKey); + // merge row with log. + final HoodieAvroRecord record = (HoodieAvroRecord) scanner.getRecords().get(curKey); + Option combineResult = record.getData().combineAndGetUpdateValue(currentRecord, schema, config.getProps()); + if (combineResult.isPresent()) { + GenericRecord avroRecord = buildAvroRecordBySchema(combineResult.get(), schema, requiredPos, recordBuilder); + readBuffer.add(filterOutVariables(avroRecord)); + } + } else { + readBuffer.add(filterOutVariables(currentRecord)); } - }) - .filter(Objects::nonNull) - .sorted(Comparator.naturalOrder()) - .collect(Collectors.toList()); + currentRecord = reader.read(); + } + } + // read the remaining log data. + if (scanner != null) { + for (String curKey : scanner.getRecords().keySet()) { + if (!keyToSkip.contains(curKey)) { + Option record = (Option) scanner.getRecords() + .get(curKey).getData() + .getInsertValue(schema, config.getProps()); + if (record.isPresent()) { + readBuffer.add(filterOutVariables(record.get())); + } + } + } + } + } + // Ensure that to write and read sequences are consistent. + readBuffer.sort(String::compareTo); assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); } } @@ -722,7 +786,7 @@ private static String filterOutVariables(GenericRecord genericRecord) { fields.add(genericRecord.get("age").toString()); fields.add(genericRecord.get("ts").toString()); fields.add(genericRecord.get("partition").toString()); - return Strings.join(fields, ","); + return String.join(",",fields); } public static BinaryRowData insertRow(Object... fields) {