diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 76e9e60ee0dc4..3ca04986fe9eb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -38,6 +38,7 @@ import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StringToRowDataConverter; import org.apache.avro.Schema; @@ -63,6 +64,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.IntStream; @@ -634,10 +636,12 @@ static class MergeIterator implements RecordIterator { private final Set keyToSkip = new HashSet<>(); + private final Properties payloadProps; + private RowData currentRecord; MergeIterator( - Configuration finkConf, + Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, @@ -650,7 +654,8 @@ static class MergeIterator implements RecordIterator { ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf); + this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf); + this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps(); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos; @@ -751,7 +756,7 @@ private Option mergeRowWithLog( String curKey) throws IOException { final HoodieAvroRecord record = (HoodieAvroRecord) scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); + return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4e819ecd7b93a..7c7cdcc8adb37 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -228,12 +228,7 @@ public static HoodieWriteConfig getHoodieClientConfig( .withClientNumRetries(30) .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) .build()) - .withPayloadConfig(HoodiePayloadConfig.newBuilder() - .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) - .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) - .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) - .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) - .build()) + .withPayloadConfig(getPayloadConfig(conf)) .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) @@ -251,6 +246,18 @@ public static HoodieWriteConfig getHoodieClientConfig( return writeConfig; } + /** + * Returns the payload config with given configuration. + */ + public static HoodiePayloadConfig getPayloadConfig(Configuration conf) { + return HoodiePayloadConfig.newBuilder() + .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) + .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) + .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) + .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) + .build(); + } + /** * Converts the give {@link Configuration} to {@link TypedProperties}. * The default values are also set up. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index b663a4af3e318..9f821619089fd 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -473,6 +474,31 @@ void testReadIncrementally(HoodieTableType tableType) throws Exception { TestData.assertRowDataEquals(actual6, Collections.emptyList()); } + @Test + void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName()); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write base file first with compaction. + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, conf); + InputFormat inputFormat = this.tableSource.getInputFormat(); + final String baseResult = TestData.rowDataToString(readData(inputFormat)); + String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"; + assertThat(baseResult, is(expected)); + + // write another commit using logs and read again. + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf); + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + final String baseMergeLogFileResult = TestData.rowDataToString(readData(inputFormat)); + assertThat(baseMergeLogFileResult, is(expected)); + } + @Test void testReadArchivedCommitsIncrementally() throws Exception { Map options = new HashMap<>();