From 285de8f10369c872b5ccdbcf56e203b498a7d54b Mon Sep 17 00:00:00 2001 From: voonhous Date: Tue, 21 Mar 2023 13:47:48 +0800 Subject: [PATCH 1/2] [HUDI-5822] Fix FileId not found exception when FileId is passed to HoodieMergeHandle --- .../common/testutils/FileCreateUtils.java | 16 +- .../bucket/BucketStreamWriteFunction.java | 17 +- .../sink/bucket/ITTestBucketStreamWrite.java | 184 ++++++++++++++++++ 3 files changed, 206 insertions(+), 11 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 9a2dba04681a5..cacb60b6175ab 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -101,7 +101,12 @@ public static String markerFileName(String instantTime, String fileId, IOType io } public static String markerFileName(String instantTime, String fileId, IOType ioType, String fileExtension) { - return String.format("%s_%s_%s%s%s.%s", fileId, WRITE_TOKEN, instantTime, fileExtension, HoodieTableMetaClient.MARKER_EXTN, ioType); + return markerFileName(instantTime, fileId, ioType, fileExtension, WRITE_TOKEN); + } + + public static String markerFileName(String instantTime, String fileId, IOType ioType, String fileExtension, String writeToken) { + return String.format("%s_%s_%s%s%s.%s", fileId, writeToken, instantTime, fileExtension, + HoodieTableMetaClient.MARKER_EXTN, ioType); } private static void createMetaFile(String basePath, String instantTime, String suffix, FileSystem fs) throws IOException { @@ -356,9 +361,14 @@ public static void createLogFile(String basePath, String partitionPath, String i public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileId, IOType ioType) throws IOException { - Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); + return createMarkerFile(basePath, partitionPath, instantTime, instantTime, fileId, ioType, WRITE_TOKEN); + } + + public static String createMarkerFile(String basePath, String partitionPath, String commitInstant, + String instantTime, String fileId, IOType ioType, String writeToken) throws IOException { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, commitInstant, partitionPath); Files.createDirectories(parentPath); - Path markerFilePath = parentPath.resolve(markerFileName(instantTime, fileId, ioType)); + Path markerFilePath = parentPath.resolve(markerFileName(instantTime, fileId, ioType, BASE_FILE_EXTENSION, writeToken)); if (Files.notExists(markerFilePath)) { Files.createFile(markerFilePath); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index dcbb30fb8bc44..e66a64cd66bdb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -157,17 +157,18 @@ private void bootstrapIndexIfNeed(String partition) { // Load existing fileID belongs to this task Map bucketToFileIDMap = new HashMap<>(); - this.writeClient.getHoodieTable().getHoodieView().getAllFileGroups(partition).forEach(fileGroup -> { - String fileID = fileGroup.getFileGroupId().getFileId(); - int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); + this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> { + String fileId = fileSlice.getFileId(); + int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId); if (isBucketToLoad(bucketNumber, partition)) { - LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID)); + LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId)); + // Ensure that one bucketId has only ONE fileId if (bucketToFileIDMap.containsKey(bucketNumber)) { - throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found " - + "during the BucketStreamWriteFunction index bootstrap.", fileID, bucketNumber, partition)); + throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found " + + "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition)); } else { - LOG.info(String.format("Adding fileID %s to the bucket %s of partition %s.", fileID, bucketNumber, partition)); - bucketToFileIDMap.put(bucketNumber, fileID); + LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, partition)); + bucketToFileIDMap.put(bucketNumber, fileId); } } }); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java new file mode 100644 index 0000000000000..f7ad6bf5acc7f --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bucket; + + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.sink.clustering.FlinkClusteringConfig; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.FlinkMiniCluster; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestSQL; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Integration test cases for {@link BucketStreamWriteFunction}. + */ +@ExtendWith(FlinkMiniCluster.class) +public class ITTestBucketStreamWrite { + + private static final Map EXPECTED = new HashMap<>(); + + static { + EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]"); + EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]"); + EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]"); + EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]"); + } + + @TempDir + File tempFile; + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBucketStreamWriteAfterRollbackFirstFileGroupCreation(boolean isCow) throws Exception { + // this test is to ensure that the correct fileId can be fetched when recovering from a rollover when a new + // fileGroup is created for a bucketId + String tablePath = tempFile.getAbsolutePath(); + doWrite(tablePath, isCow); + doDeleteCommit(tablePath, isCow); + doWrite(tablePath, isCow); + doWrite(tablePath, isCow); + + if (isCow) { + TestData.checkWrittenData(tempFile, EXPECTED, 4); + } else { + FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), new org.apache.hadoop.conf.Configuration()); + TestData.checkWrittenDataMOR(fs, tempFile, EXPECTED, 4); + } + } + + private static void doDeleteCommit(String tablePath, boolean isCow) throws Exception { + // make configuration and setAvroSchema + FlinkClusteringConfig cfg = new FlinkClusteringConfig(); + cfg.path = tablePath; + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + // create metaClient + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + + conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableType().name()); + + // set the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // should only contain one instant + HoodieTimeline activeCompletedTimeline = metaClient.getActiveTimeline().filterCompletedInstants(); + assertEquals(1, activeCompletedTimeline.getInstants().size()); + + // rollback path structure: tablePath/.hoodie/.temp/${commitInstant}/${partition}/${fileGroup}_${fileInstant}.parquet.marker.APPEND + HoodieInstant instant = activeCompletedTimeline.getInstants().get(0); + String commitInstant = instant.getTimestamp(); + String filename = activeCompletedTimeline.getInstants().get(0).getFileName(); + + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); + + // delete successful commit to simulate an unsuccessful write + FileSystem fs = metaClient.getFs(); + Path path = new Path(metaClient.getMetaPath() + Path.SEPARATOR + filename); + fs.delete(path); + + // marker types are different for COW and MOR + IOType ioType = isCow ? IOType.CREATE : IOType.APPEND; + + commitMetadata.getFileIdAndRelativePaths().forEach((fileId, relativePath) -> { + // hacky way to reconstruct markers ¯\_(ツ)_/¯ + String[] partitionFileNameSplit = relativePath.split("/"); + String fileInstant = FSUtils.getCommitTime(partitionFileNameSplit[1]); + String partition = partitionFileNameSplit[0]; + String writeToken = isCow ? getWriteToken(partitionFileNameSplit[1]) : FSUtils.getWriteTokenFromLogPath(new Path(relativePath)); + try { + FileCreateUtils.createMarkerFile(tablePath, partition, commitInstant, fileInstant, fileId, ioType, writeToken); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private static String getWriteToken(String relativeFilePath) { + Pattern writeTokenPattern = Pattern.compile("_((\\d+)-(\\d+)-(\\d+))_"); + Matcher matcher = writeTokenPattern.matcher(relativeFilePath); + if (!matcher.find()) { + throw new RuntimeException("Invalid relative file path: " + relativeFilePath); + } + return matcher.group(1); + } + + private static void doWrite(String path, boolean isCow) throws InterruptedException, ExecutionException { + // create hoodie table and perform writes + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), path); + + // use bucket index + options.put(FlinkOptions.TABLE_TYPE.key(), isCow ? FlinkOptions.TABLE_TYPE_COPY_ON_WRITE : FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.put(FlinkOptions.INDEX_TYPE.key(), IndexType.BUCKET.name()); + options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "1"); + + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + } +} \ No newline at end of file From 6aa7bc9a0209cef639ae47d9c7814851dc8c06cb Mon Sep 17 00:00:00 2001 From: voonhous Date: Wed, 22 Mar 2023 10:45:57 +0800 Subject: [PATCH 2/2] [HUDI-5822] Fix FileId not found exception when FileId is passed to HoodieMergeHandle --- .../org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index e66a64cd66bdb..afd55639c3c15 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -162,7 +162,7 @@ private void bootstrapIndexIfNeed(String partition) { int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId); if (isBucketToLoad(bucketNumber, partition)) { LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId)); - // Ensure that one bucketId has only ONE fileId + // Validate that one bucketId has only ONE fileId if (bucketToFileIDMap.containsKey(bucketNumber)) { throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found " + "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition));