diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 767968629d187..86240fa598294 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -73,7 +73,6 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; -import java.util.Properties; import java.util.Set; import java.util.function.Function; import java.util.stream.IntStream; @@ -767,7 +766,7 @@ public boolean hasNext() { final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); if (scanner.getRecords().containsKey(curKey)) { keyToSkip.add(curKey); - Option mergedAvroRecord = mergeRowWithLog(currentRecord, curKey); + Option> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey); if (!mergedAvroRecord.isPresent()) { // deleted continue; @@ -838,13 +837,13 @@ public void close() { } } - private Option mergeRowWithLog(RowData curRow, String curKey) { - final HoodieAvroRecord record = (HoodieAvroRecord) scanner.getRecords().get(curKey); + @SuppressWarnings("unchecked") + private Option> mergeRowWithLog(RowData curRow, String curKey) { + final HoodieRecord record = scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(historyAvroRecord); try { - Option resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft); - return resultRecord.get().toIndexedRecord(tableSchema, new Properties()); + return recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft); } catch (IOException e) { throw new HoodieIOException("Merge base and delta payloads exception", e); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index d1b5516d1ad77..7c0763441668d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -952,6 +952,14 @@ void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception { assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); final String baseMergeLogFileResult = TestData.rowDataToString(readData(inputFormat)); assertThat(baseMergeLogFileResult, is(expected)); + + // write another commit with delete messages + TestData.writeData(TestData.DATA_SET_SINGLE_DELETE, conf); + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + final String baseMergeLogFileResult2 = TestData.rowDataToString(readData(inputFormat)); + assertThat(baseMergeLogFileResult2, is("[]")); } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 7199ba069fc28..7f9ad1089413d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -330,6 +330,10 @@ public class TestData { insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); + public static List DATA_SET_SINGLE_DELETE = Collections.singletonList( + deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(5), StringData.fromString("par1"))); + public static List DATA_SET_DISORDER_INSERT = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(3), StringData.fromString("par1")),