From 657c06971bfccdfb905d3959fdfc911df11db662 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Wed, 26 Jan 2022 10:08:40 +0800 Subject: [PATCH 1/6] ready to test --- .../view/AbstractTableFileSystemView.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 92e6171b68327..cea4cc345f65c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -380,6 +380,20 @@ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) { && baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey()); } + /** + * With async clustering, it is possible to see partial/complete base-files due to inflight-clustering, Ignore those + * base-files. + * + * @param baseFile base File + */ + protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) { + final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath()); + Option pendingClusteringInstant = getPendingClusteringInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId())); + + return (pendingClusteringInstant.isPresent()) && (null != pendingClusteringInstant.get().getTimestamp()) + && baseFile.getCommitTime().equals(pendingClusteringInstant.get().getTimestamp()); + } + /** * Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction * Instant. @@ -492,7 +506,7 @@ public final Stream getLatestBaseFilesBeforeOrOn(String partitio .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles() .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime )) - .filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst())) + .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst())) .filter(Option::isPresent).map(Option::get) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df)); } finally { @@ -511,7 +525,7 @@ public final Option getBaseFileOn(String partitionStr, String in } else { return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles() .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS, - instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null)) + instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null)) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df)); } } finally { @@ -547,7 +561,7 @@ public final Stream getLatestBaseFilesInRange(List commi .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn)) .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional( fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime()) - && !isBaseFileDueToPendingCompaction(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent()) + && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent()) .map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get())); } finally { readLock.unlock(); @@ -563,7 +577,7 @@ public final Stream getAllBaseFiles(String partitionStr) { return fetchAllBaseFiles(partitionPath) .filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId())) .filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime())) - .filter(df -> !isBaseFileDueToPendingCompaction(df)) + .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df)); } finally { readLock.unlock(); @@ -953,7 +967,7 @@ public Stream fetchLatestBaseFiles(final String partitionPath) { protected Option getLatestBaseFile(HoodieFileGroup fileGroup) { return Option - .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()); + .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()); } /** From 4c7e165439004360ef246e4c89d4e1e3d858b3b7 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 14 Feb 2022 10:33:19 +0800 Subject: [PATCH 2/6] ready to test --- .../common/table/view/AbstractTableFileSystemView.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index cea4cc345f65c..3d34ff4c5caf4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -387,11 +387,9 @@ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) { * @param baseFile base File */ protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) { - final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath()); - Option pendingClusteringInstant = getPendingClusteringInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId())); - - return (pendingClusteringInstant.isPresent()) && (null != pendingClusteringInstant.get().getTimestamp()) - && baseFile.getCommitTime().equals(pendingClusteringInstant.get().getTimestamp()); + List pendingReplaceInstants = + metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + return pendingReplaceInstants.isEmpty() || pendingReplaceInstants.contains(baseFile.getCommitTime()); } /** From 82787b8461a0447467508cf2c7750870e57a8e8a Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 14 Feb 2022 16:12:23 +0800 Subject: [PATCH 3/6] add UTs --- .../view/AbstractTableFileSystemView.java | 3 +- .../view/TestHoodieTableFileSystemView.java | 215 ++++++++++++++++++ .../common/testutils/HoodieTestTable.java | 2 +- 3 files changed, 218 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 3d34ff4c5caf4..e393dd8029c68 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -389,7 +389,8 @@ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) { protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) { List pendingReplaceInstants = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - return pendingReplaceInstants.isEmpty() || pendingReplaceInstants.contains(baseFile.getCommitTime()); + + return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime()); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 924c6724e7b22..049494f3fb6dc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.CompactionOperation; @@ -41,6 +42,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -50,11 +52,13 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -1537,6 +1541,217 @@ public void testPendingClusteringOperations() throws IOException { assertFalse(fileIds.contains(fileId3)); } + /** + * + * create hoodie table like + * . + * ├── .hoodie + * │   ├── .aux + * │   │   └── .bootstrap + * │   │   ├── .fileids + * │   │   └── .partitions + * │   ├── .temp + * │   ├── 1.commit + * │   ├── 1.commit.requested + * │   ├── 1.inflight + * │   ├── 2.replacecommit + * │   ├── 2.replacecommit.inflight + * │   ├── 2.replacecommit.requested + * │   ├── 3.commit + * │   ├── 3.commit.requested + * │   ├── 3.inflight + * │   ├── archived + * │   └── hoodie.properties + * └── 2020 + * └── 06 + * └── 27 + * ├── 5fe477d2-0150-46d4-833c-1e9cc8da9948_1-0-1_3.parquet + * ├── 7e3208c8-fdec-4254-9682-8fff1e51ee8d_1-0-1_2.parquet + * ├── e04b0e2d-1467-46b2-8ea6-f4fe950965a5_1-0-1_1.parquet + * └── f3936b66-b3db-4fc8-a6d0-b1a7559016e6_1-0-1_1.parquet + * + * First test fsView API with finished clustering: + * 1. getLatestBaseFilesBeforeOrOn + * 2. getBaseFileOn + * 3. getLatestBaseFilesInRange + * 4. getAllBaseFiles + * 5. getLatestBaseFile + * + * Then remove 2.replacecommit, 1.commit, 1.commit.requested, 1.inflight to simulate + * pending clustering at the earliest position in the active timeline and test these APIs again. + * + * @throws IOException + */ + @Test + public void testHoodieTableFileSystemViewWithPendingClustering() throws IOException { + List latestBaseFilesBeforeOrOn; + Option baseFileOn; + List latestBaseFilesInRange; + List allBaseFiles; + Option latestBaseFile; + String partitionPath = "2020/06/27"; + new File(basePath + "/" + partitionPath).mkdirs(); + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + + // will create 5 fileId in partition. + // fileId1 and fileId2 will be replaced by fileID3 + // fileId4 and fileId5 will be committed after clustering finished. + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); + String fileId5 = UUID.randomUUID().toString(); + + assertFalse(roView.getLatestBaseFiles(partitionPath) + .anyMatch(dfile -> dfile.getFileId().equals(fileId1) + || dfile.getFileId().equals(fileId2) + || dfile.getFileId().equals(fileId3) + || dfile.getFileId().equals(fileId4) + || dfile.getFileId().equals(fileId5)), + "No commit, should not find any data file"); + + // first insert commit + String commitTime1 = "1"; + String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1); + String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2); + new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile(); + + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); + + // build writeStats + HashMap> partitionToFile1 = new HashMap<>(); + ArrayList files1 = new ArrayList<>(); + files1.add(fileId1); + files1.add(fileId2); + partitionToFile1.put(partitionPath, files1); + List writeStats1 = buildWriteStats(partitionToFile1, commitTime1); + + HoodieCommitMetadata commitMetadata1 = + CommitUtils.buildMetadata(writeStats1, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION); + saveAsComplete(commitTimeline, instant1, Option.of(commitMetadata1.toJsonString().getBytes(StandardCharsets.UTF_8))); + commitTimeline.reload(); + + // replace commit + String commitTime2 = "2"; + String fileName3 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId3); + new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile(); + + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime2); + Map> partitionToReplaceFileIds = new HashMap<>(); + List replacedFileIds = new ArrayList<>(); + replacedFileIds.add(fileId1); + replacedFileIds.add(fileId2); + partitionToReplaceFileIds.put(partitionPath, replacedFileIds); + + HashMap> partitionToFile2 = new HashMap<>(); + ArrayList files2 = new ArrayList<>(); + files2.add(fileId3); + partitionToFile2.put(partitionPath, files2); + List writeStats2 = buildWriteStats(partitionToFile2, commitTime2); + + HoodieCommitMetadata commitMetadata2 = + CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION); + saveAsComplete(commitTimeline, instant2, Option.of(commitMetadata2.toJsonString().getBytes(StandardCharsets.UTF_8))); + + // another insert commit + String commitTime3 = "3"; + String fileName4 = FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId4); + new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile(); + HoodieInstant instant3 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime3); + + // build writeStats + HashMap> partitionToFile3 = new HashMap<>(); + ArrayList files3 = new ArrayList<>(); + files3.add(fileId4); + partitionToFile3.put(partitionPath, files3); + List writeStats3 = buildWriteStats(partitionToFile3, commitTime3); + HoodieCommitMetadata commitMetadata3 = + CommitUtils.buildMetadata(writeStats3, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION); + saveAsComplete(commitTimeline, instant3, Option.of(commitMetadata3.toJsonString().getBytes(StandardCharsets.UTF_8))); + + metaClient.reloadActiveTimeline(); + refreshFsView(); + + ArrayList commits = new ArrayList<>(); + commits.add(commitTime1); + commits.add(commitTime2); + commits.add(commitTime3); + + // do check + latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFilesBeforeOrOn.size()); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId3)); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4)); + + // could see fileId3 because clustering is committed. + baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3); + assertTrue(baseFileOn.isPresent()); + assertEquals(baseFileOn.get().getFileId(), fileId3); + + latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFilesInRange.size()); + assertTrue(latestBaseFilesInRange.contains(fileId3)); + assertTrue(latestBaseFilesInRange.contains(fileId4)); + + allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, allBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId3)); + assertTrue(allBaseFiles.contains(fileId4)); + + // could see fileId3 because clustering is committed. + latestBaseFile = fsView.getLatestBaseFile(partitionPath, fileId3); + assertTrue(latestBaseFile.isPresent()); + assertEquals(latestBaseFile.get().getFileId(), fileId3); + + HoodieWrapperFileSystem fs = metaClient.getFs(); + fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false); + fs.delete(new Path(basePath + "/.hoodie", "1.inflight"), false); + fs.delete(new Path(basePath + "/.hoodie", "1.commit.requested"), false); + fs.delete(new Path(basePath + "/.hoodie", "2.replacecommit"), false); + + metaClient.reloadActiveTimeline(); + refreshFsView(); + // do check after delete some commit file + latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFilesBeforeOrOn.size()); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId1)); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId2)); + assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4)); + + // couldn't see fileId3 because clustering is not committed. + baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3); + assertFalse(baseFileOn.isPresent()); + + latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFilesInRange.size()); + assertTrue(latestBaseFilesInRange.contains(fileId1)); + assertTrue(latestBaseFilesInRange.contains(fileId2)); + assertTrue(latestBaseFilesInRange.contains(fileId4)); + + allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, allBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId1)); + assertTrue(allBaseFiles.contains(fileId2)); + assertTrue(allBaseFiles.contains(fileId4)); + + // couldn't see fileId3 because clustering is not committed. + latestBaseFile = fsView.getLatestBaseFile(partitionPath, fileId3); + assertFalse(latestBaseFile.isPresent()); + + } + + + // Generate Hoodie WriteStat For Given Partition + private List buildWriteStats(HashMap> partitionToFileIds, String commitTime) { + HashMap>> maps = new HashMap<>(); + for (String partition : partitionToFileIds.keySet()) { + List> list = partitionToFileIds.get(partition).stream().map(fileId -> new ImmutablePair(fileId, 0)).collect(Collectors.toList()); + maps.put(partition, list); + } + return HoodieTestTable.generateHoodieWriteStatForPartition(maps, commitTime, false); + } + @Override protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index c55a389e268ca..f78312217eec2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -1044,7 +1044,7 @@ private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(Write return testTableState; } - private static List generateHoodieWriteStatForPartition(Map>> partitionToFileIdMap, + public static List generateHoodieWriteStatForPartition(Map>> partitionToFileIdMap, String commitTime, boolean bootstrap) { List writeStats = new ArrayList<>(); for (Map.Entry>> entry : partitionToFileIdMap.entrySet()) { From 7af5cab11b0de6be344b06affad7cdff4285405d Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 14 Feb 2022 22:37:08 +0800 Subject: [PATCH 4/6] ut --- .../view/TestHoodieTableFileSystemView.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 049494f3fb6dc..23e5f1da104be 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -1575,7 +1575,7 @@ public void testPendingClusteringOperations() throws IOException { * 2. getBaseFileOn * 3. getLatestBaseFilesInRange * 4. getAllBaseFiles - * 5. getLatestBaseFile + * 5. getLatestBaseFiles * * Then remove 2.replacecommit, 1.commit, 1.commit.requested, 1.inflight to simulate * pending clustering at the earliest position in the active timeline and test these APIs again. @@ -1588,7 +1588,7 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept Option baseFileOn; List latestBaseFilesInRange; List allBaseFiles; - Option latestBaseFile; + List latestBaseFiles; String partitionPath = "2020/06/27"; new File(basePath + "/" + partitionPath).mkdirs(); HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); @@ -1700,9 +1700,10 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept assertTrue(allBaseFiles.contains(fileId4)); // could see fileId3 because clustering is committed. - latestBaseFile = fsView.getLatestBaseFile(partitionPath, fileId3); - assertTrue(latestBaseFile.isPresent()); - assertEquals(latestBaseFile.get().getFileId(), fileId3); + latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());; + assertEquals(2, latestBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId3)); + assertTrue(allBaseFiles.contains(fileId4)); HoodieWrapperFileSystem fs = metaClient.getFs(); fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false); @@ -1736,8 +1737,11 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept assertTrue(allBaseFiles.contains(fileId4)); // couldn't see fileId3 because clustering is not committed. - latestBaseFile = fsView.getLatestBaseFile(partitionPath, fileId3); - assertFalse(latestBaseFile.isPresent()); + latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());; + assertEquals(3, latestBaseFiles.size()); + assertTrue(allBaseFiles.contains(fileId1)); + assertTrue(allBaseFiles.contains(fileId2)); + assertTrue(allBaseFiles.contains(fileId4)); } From c9de44f309ad09ef92f3cd212262d18302422978 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 14 Feb 2022 22:44:13 +0800 Subject: [PATCH 5/6] check style --- .../hudi/common/table/view/TestHoodieTableFileSystemView.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 23e5f1da104be..8827e97cbf90f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -1700,7 +1700,7 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept assertTrue(allBaseFiles.contains(fileId4)); // could see fileId3 because clustering is committed. - latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());; + latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList()); assertEquals(2, latestBaseFiles.size()); assertTrue(allBaseFiles.contains(fileId3)); assertTrue(allBaseFiles.contains(fileId4)); @@ -1737,7 +1737,7 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept assertTrue(allBaseFiles.contains(fileId4)); // couldn't see fileId3 because clustering is not committed. - latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());; + latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList()); assertEquals(3, latestBaseFiles.size()); assertTrue(allBaseFiles.contains(fileId1)); assertTrue(allBaseFiles.contains(fileId2)); From e7f2297be7ee21f54a2600433537d3fcb83f3f7b Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 15 Feb 2022 09:39:55 +0800 Subject: [PATCH 6/6] fix uts --- .../table/action/commit/TestUpsertPartitioner.java | 12 ++++++------ .../table/view/TestHoodieTableFileSystemView.java | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 1e5f8029a7145..b81cf3d765d5d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -374,23 +374,23 @@ public void testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan() throws .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata)); - // create file slice 002 - FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1); - FileCreateUtils.createCommit(basePath, "002"); + // create file slice 003 + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "3", 1); + FileCreateUtils.createCommit(basePath, "003"); metaClient = HoodieTableMetaClient.reload(metaClient); // generate new data to be ingested HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - List insertRecords = dataGenerator.generateInserts("003", 100); + List insertRecords = dataGenerator.generateInserts("004", 100); WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords))); HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); // create UpsertPartitioner UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config); - // for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan - // So that only file slice2 can be used for ingestion. + // for now we have file slice1 and file slice3 and file slice1 is contained in pending clustering plan + // So that only file slice3 can be used for ingestion. assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested."); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 8827e97cbf90f..54bc138fc8f84 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -60,6 +60,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.BeforeEach; @@ -1589,6 +1590,7 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept List latestBaseFilesInRange; List allBaseFiles; List latestBaseFiles; + List latestBaseFilesPerPartition; String partitionPath = "2020/06/27"; new File(basePath + "/" + partitionPath).mkdirs(); HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); @@ -1705,6 +1707,12 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept assertTrue(allBaseFiles.contains(fileId3)); assertTrue(allBaseFiles.contains(fileId4)); + // could see fileId3 because clustering is committed. + latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(2, latestBaseFiles.size()); + assertTrue(latestBaseFilesPerPartition.contains(fileId3)); + assertTrue(latestBaseFilesPerPartition.contains(fileId4)); + HoodieWrapperFileSystem fs = metaClient.getFs(); fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false); fs.delete(new Path(basePath + "/.hoodie", "1.inflight"), false); @@ -1743,6 +1751,12 @@ public void testHoodieTableFileSystemViewWithPendingClustering() throws IOExcept assertTrue(allBaseFiles.contains(fileId2)); assertTrue(allBaseFiles.contains(fileId4)); + // couldn't see fileId3 because clustering is not committed. + latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList()); + assertEquals(3, latestBaseFiles.size()); + assertTrue(latestBaseFilesPerPartition.contains(fileId1)); + assertTrue(latestBaseFilesPerPartition.contains(fileId2)); + assertTrue(latestBaseFilesPerPartition.contains(fileId4)); }