From 4d719b083d03d4712359e26a8f2e200355c40622 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Fri, 19 Nov 2021 14:24:44 +0800 Subject: [PATCH] [HUDI-2798] Fix flink query operation fields --- .../org/apache/hudi/io/HoodieMergeHandle.java | 23 ++++--- .../table/action/compact/HoodieCompactor.java | 1 + .../hudi/source/IncrementalInputSplits.java | 67 +++++++++++++------ .../format/mor/MergeOnReadInputFormat.java | 10 +-- .../hudi/table/format/TestInputFormat.java | 17 +++-- 5 files changed, 78 insertions(+), 40 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index b13b561f5dff..d1d67efff4b9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -250,11 +250,17 @@ protected void init(String fileId, Iterator> newRecordsItr) { + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); } - private boolean writeUpdateRecord(HoodieRecord hoodieRecord, Option indexedRecord) { + private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option indexedRecord) { + boolean isDelete = false; if (indexedRecord.isPresent()) { updatedRecordsWritten++; + GenericRecord record = (GenericRecord) indexedRecord.get(); + if (oldRecord != record) { + // the incoming record is chosen + isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation()); + } } - return writeRecord(hoodieRecord, indexedRecord); + return writeRecord(hoodieRecord, indexedRecord, isDelete); } protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { @@ -264,12 +270,16 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOExceptio if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { return; } - if (writeRecord(hoodieRecord, insertRecord)) { + if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) { insertRecordsWritten++; } } protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) { + return writeRecord(hoodieRecord, indexedRecord, false); + } + + protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete) { Option recordMetadata = hoodieRecord.getData().getMetadata(); if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " @@ -277,11 +287,8 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option compact(HoodieCompactionHandler compactionHandler, .withSpillableMapBasePath(config.getSpillableMapBasePath()) .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) + .withOperationField(config.allowOperationMetadataField()) .build(); if (!scanner.iterator().hasNext()) { scanner.close(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 653e182bfad9..d0fcc854dbe4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -156,28 +157,52 @@ public Result inputSplits( } String tableName = conf.getString(FlinkOptions.TABLE_NAME); - List activeMetadataList = instants.stream() - .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); - List archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); - if (archivedMetadataList.size() > 0) { - LOG.warn("\n" - + "--------------------------------------------------------------------------------\n" - + "---------- caution: the reader has fall behind too much from the writer,\n" - + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" - + "--------------------------------------------------------------------------------"); - } - List metadataList = archivedMetadataList.size() > 0 - // IMPORTANT: the merged metadata list must be in ascending order by instant time - ? mergeList(archivedMetadataList, activeMetadataList) - : activeMetadataList; - - Set writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList); - // apply partition push down - if (this.requiredPartitions != null) { - writePartitions = writePartitions.stream() - .filter(this.requiredPartitions::contains).collect(Collectors.toSet()); + + Set writePartitions; + final FileStatus[] fileStatuses; + + if (instantRange == null) { + // reading from the earliest, scans the partitions and files directly. + FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf); + if (this.requiredPartitions != null) { + // apply partition push down + fileIndex.setPartitionPaths(this.requiredPartitions); + } + writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths()); + if (writePartitions.size() == 0) { + LOG.warn("No partitions found for reading in user provided path."); + return Result.EMPTY; + } + fileStatuses = fileIndex.getFilesInPartitions(); + } else { + List activeMetadataList = instants.stream() + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); + List archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); + if (archivedMetadataList.size() > 0) { + LOG.warn("\n" + + "--------------------------------------------------------------------------------\n" + + "---------- caution: the reader has fall behind too much from the writer,\n" + + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" + + "--------------------------------------------------------------------------------"); + } + List metadataList = archivedMetadataList.size() > 0 + // IMPORTANT: the merged metadata list must be in ascending order by instant time + ? mergeList(archivedMetadataList, activeMetadataList) + : activeMetadataList; + + writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList); + // apply partition push down + if (this.requiredPartitions != null) { + writePartitions = writePartitions.stream() + .filter(this.requiredPartitions::contains).collect(Collectors.toSet()); + } + if (writePartitions.size() == 0) { + LOG.warn("No partitions found for reading in user provided path."); + return Result.EMPTY; + } + fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType()); } - FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType()); + if (fileStatuses.length == 0) { LOG.warn("No files found for reading in user provided path."); return Result.EMPTY; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 95c4bd4a5431..7a72bca0582f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -683,13 +683,18 @@ public boolean reachedEnd() throws IOException { // deleted continue; } else { + final RowKind rowKind = FormatUtils.getRowKindSafely(mergedAvroRecord.get(), this.operationPos); + if (!emitDelete && rowKind == RowKind.DELETE) { + // deleted + continue; + } GenericRecord avroRecord = buildAvroRecordBySchema( mergedAvroRecord.get(), requiredSchema, requiredPos, recordBuilder); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); - FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos); + this.currentRecord.setRowKind(rowKind); return false; } } @@ -746,9 +751,6 @@ private Option mergeRowWithLog( RowData curRow, String curKey) throws IOException { final HoodieRecord record = scanner.getRecords().get(curKey); - if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) { - return Option.empty(); - } GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index f4da947f3bfc..e6424a1abb75 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; @@ -221,8 +222,9 @@ void testReadBaseAndLogFilesWithDeletes() throws Exception { assertThat(actual2, is(expected2)); } - @Test - void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean compact) throws Exception { Map options = new HashMap<>(); options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); beforeEach(HoodieTableType.MERGE_ON_READ, options); @@ -233,7 +235,7 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf); // write another commit using logs and read again. - conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, compact); TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf); InputFormat inputFormat = this.tableSource.getInputFormat(); @@ -242,9 +244,11 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { // when isEmitDelete is false. List result1 = readData(inputFormat); + final String rowKind = compact ? "I" : "U"; + final String expected = "[+" + rowKind + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"; + final String actual1 = TestData.rowDataToString(result1); - final String expected1 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"; - assertThat(actual1, is(expected1)); + assertThat(actual1, is(expected)); // refresh the input format and set isEmitDelete to true. this.tableSource.reset(); @@ -254,8 +258,7 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { List result2 = readData(inputFormat); final String actual2 = TestData.rowDataToString(result2); - final String expected2 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"; - assertThat(actual2, is(expected2)); + assertThat(actual2, is(expected)); } @Test