Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,23 +374,23 @@ public void testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan() throws
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata));

// create file slice 002
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1);
FileCreateUtils.createCommit(basePath, "002");
// create file slice 003
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "3", 1);
FileCreateUtils.createCommit(basePath, "003");

metaClient = HoodieTableMetaClient.reload(metaClient);

// generate new data to be ingested
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("003", 100);
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("004", 100);
WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));

HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);
// create UpsertPartitioner
UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config);

// for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan
// So that only file slice2 can be used for ingestion.
// for now we have file slice1 and file slice3 and file slice1 is contained in pending clustering plan
// So that only file slice3 can be used for ingestion.
assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,19 @@ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) {
&& baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey());
}

/**
* With async clustering, it is possible to see partial/complete base-files due to inflight-clustering, Ignore those
* base-files.
*
* @param baseFile base File
*/
protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) {
List<String> pendingReplaceInstants =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not reuse isPendingClusteringScheduledForFileId() or getPendingClusteringInstant()? So, we maintain a map of fgIdToPendingClustering which supports various methods. If we can reuse one of them then we need to call active timeline.

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Feb 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm, maybe we can't use fgIdToPendingClustering to do filter here.
Because the files recorded in fgIdToPendingClustering are committed file and need to be seen.
What we need to filter here are the in-flight uncommitted data files produced by clustering job.

So that we need to know the instant time of xxxx.replacecommit.requested or xxxx.replacecommit.inflight and use it to filter out uncommitted clustering creating data files instead of the files which need to be clustering.


return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime());
}

/**
* Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction
* Instant.
Expand Down Expand Up @@ -492,7 +505,7 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitio
.map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
))
.filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()))
.filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the caller pass in the right maxCommitTime to filter out the pending base files ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When inflight clustering at the earliest instant of the active timeline, this bug could happen(based on follow code). So that no matter what maxCommitTime is, we can' t filter it out.
Now we use take containsOrBeforeTimelineStarts as committed, which may involve unfinished clustering data(this inflight clustering instant is at the earliest of active timeline.)

private boolean isFileSliceCommitted(FileSlice slice) {

.filter(Option::isPresent).map(Option::get)
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
} finally {
Expand All @@ -511,7 +524,7 @@ public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, String in
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
}
} finally {
Expand Down Expand Up @@ -547,7 +560,7 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
.filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
.map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
} finally {
readLock.unlock();
Expand All @@ -563,7 +576,7 @@ public final Stream<HoodieBaseFile> getAllBaseFiles(String partitionStr) {
return fetchAllBaseFiles(partitionPath)
.filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId()))
.filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
.filter(df -> !isBaseFileDueToPendingCompaction(df))
.filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
} finally {
readLock.unlock();
Expand Down Expand Up @@ -953,7 +966,7 @@ public Stream<HoodieBaseFile> fetchLatestBaseFiles(final String partitionPath) {

protected Option<HoodieBaseFile> getLatestBaseFile(HoodieFileGroup fileGroup) {
return Option
.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst());
.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.CompactionOperation;
Expand All @@ -41,6 +42,7 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -50,12 +52,15 @@
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1537,6 +1542,234 @@ public void testPendingClusteringOperations() throws IOException {
assertFalse(fileIds.contains(fileId3));
}

/**
*
* create hoodie table like
* .
* ├── .hoodie
* │   ├── .aux
* │   │   └── .bootstrap
* │   │   ├── .fileids
* │   │   └── .partitions
* │   ├── .temp
* │   ├── 1.commit
* │   ├── 1.commit.requested
* │   ├── 1.inflight
* │   ├── 2.replacecommit
* │   ├── 2.replacecommit.inflight
* │   ├── 2.replacecommit.requested
* │   ├── 3.commit
* │   ├── 3.commit.requested
* │   ├── 3.inflight
* │   ├── archived
* │   └── hoodie.properties
* └── 2020
* └── 06
* └── 27
* ├── 5fe477d2-0150-46d4-833c-1e9cc8da9948_1-0-1_3.parquet
* ├── 7e3208c8-fdec-4254-9682-8fff1e51ee8d_1-0-1_2.parquet
* ├── e04b0e2d-1467-46b2-8ea6-f4fe950965a5_1-0-1_1.parquet
* └── f3936b66-b3db-4fc8-a6d0-b1a7559016e6_1-0-1_1.parquet
*
* First test fsView API with finished clustering:
* 1. getLatestBaseFilesBeforeOrOn
* 2. getBaseFileOn
* 3. getLatestBaseFilesInRange
* 4. getAllBaseFiles
* 5. getLatestBaseFiles
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about other base file related APIs like fetchLatestBaseFiles, fetchAllBaseFiles? Are they all covered by this change?
PS: I think we should take a follow up task to make FSView APIs more uniform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, there is pretty much getxxxxLatestxxx() methods, hhh.
The root change here is that add isBaseFileDueToPendingClustering the same as isBaseFileDueToPendingCompaction so add this check to wherever isBaseFileDueToPendingCompaction is.

And this APIs in UT are all affected APIs.

*
* Then remove 2.replacecommit, 1.commit, 1.commit.requested, 1.inflight to simulate
* pending clustering at the earliest position in the active timeline and test these APIs again.
*
* @throws IOException
*/
@Test
public void testHoodieTableFileSystemViewWithPendingClustering() throws IOException {
List<String> latestBaseFilesBeforeOrOn;
Option<HoodieBaseFile> baseFileOn;
List<String> latestBaseFilesInRange;
List<String> allBaseFiles;
List<String> latestBaseFiles;
List<String> latestBaseFilesPerPartition;
String partitionPath = "2020/06/27";
new File(basePath + "/" + partitionPath).mkdirs();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();

// will create 5 fileId in partition.
// fileId1 and fileId2 will be replaced by fileID3
// fileId4 and fileId5 will be committed after clustering finished.
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
String fileId4 = UUID.randomUUID().toString();
String fileId5 = UUID.randomUUID().toString();

assertFalse(roView.getLatestBaseFiles(partitionPath)
.anyMatch(dfile -> dfile.getFileId().equals(fileId1)
|| dfile.getFileId().equals(fileId2)
|| dfile.getFileId().equals(fileId3)
|| dfile.getFileId().equals(fileId4)
|| dfile.getFileId().equals(fileId5)),
"No commit, should not find any data file");

// first insert commit
String commitTime1 = "1";
String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();

HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);

// build writeStats
HashMap<String, List<String>> partitionToFile1 = new HashMap<>();
ArrayList<String> files1 = new ArrayList<>();
files1.add(fileId1);
files1.add(fileId2);
partitionToFile1.put(partitionPath, files1);
List<HoodieWriteStat> writeStats1 = buildWriteStats(partitionToFile1, commitTime1);

HoodieCommitMetadata commitMetadata1 =
CommitUtils.buildMetadata(writeStats1, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION);
saveAsComplete(commitTimeline, instant1, Option.of(commitMetadata1.toJsonString().getBytes(StandardCharsets.UTF_8)));
commitTimeline.reload();

// replace commit
String commitTime2 = "2";
String fileName3 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId3);
new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile();

HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime2);
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
List<String> replacedFileIds = new ArrayList<>();
replacedFileIds.add(fileId1);
replacedFileIds.add(fileId2);
partitionToReplaceFileIds.put(partitionPath, replacedFileIds);

HashMap<String, List<String>> partitionToFile2 = new HashMap<>();
ArrayList<String> files2 = new ArrayList<>();
files2.add(fileId3);
partitionToFile2.put(partitionPath, files2);
List<HoodieWriteStat> writeStats2 = buildWriteStats(partitionToFile2, commitTime2);

HoodieCommitMetadata commitMetadata2 =
CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION);
saveAsComplete(commitTimeline, instant2, Option.of(commitMetadata2.toJsonString().getBytes(StandardCharsets.UTF_8)));

// another insert commit
String commitTime3 = "3";
String fileName4 = FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId4);
new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile();
HoodieInstant instant3 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime3);

// build writeStats
HashMap<String, List<String>> partitionToFile3 = new HashMap<>();
ArrayList<String> files3 = new ArrayList<>();
files3.add(fileId4);
partitionToFile3.put(partitionPath, files3);
List<HoodieWriteStat> writeStats3 = buildWriteStats(partitionToFile3, commitTime3);
HoodieCommitMetadata commitMetadata3 =
CommitUtils.buildMetadata(writeStats3, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION);
saveAsComplete(commitTimeline, instant3, Option.of(commitMetadata3.toJsonString().getBytes(StandardCharsets.UTF_8)));

metaClient.reloadActiveTimeline();
refreshFsView();

ArrayList<String> commits = new ArrayList<>();
commits.add(commitTime1);
commits.add(commitTime2);
commits.add(commitTime3);

// do check
latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFilesBeforeOrOn.size());
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId3));
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4));

// could see fileId3 because clustering is committed.
baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3);
assertTrue(baseFileOn.isPresent());
assertEquals(baseFileOn.get().getFileId(), fileId3);

latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFilesInRange.size());
assertTrue(latestBaseFilesInRange.contains(fileId3));
assertTrue(latestBaseFilesInRange.contains(fileId4));

allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, allBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId3));
assertTrue(allBaseFiles.contains(fileId4));

// could see fileId3 because clustering is committed.
latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId3));
assertTrue(allBaseFiles.contains(fileId4));

// could see fileId3 because clustering is committed.
latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFiles.size());
assertTrue(latestBaseFilesPerPartition.contains(fileId3));
assertTrue(latestBaseFilesPerPartition.contains(fileId4));

HoodieWrapperFileSystem fs = metaClient.getFs();
fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false);
fs.delete(new Path(basePath + "/.hoodie", "1.inflight"), false);
fs.delete(new Path(basePath + "/.hoodie", "1.commit.requested"), false);
fs.delete(new Path(basePath + "/.hoodie", "2.replacecommit"), false);

metaClient.reloadActiveTimeline();
refreshFsView();
// do check after delete some commit file
latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFilesBeforeOrOn.size());
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId1));
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId2));
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4));

// couldn't see fileId3 because clustering is not committed.
baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3);
assertFalse(baseFileOn.isPresent());

latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFilesInRange.size());
assertTrue(latestBaseFilesInRange.contains(fileId1));
assertTrue(latestBaseFilesInRange.contains(fileId2));
assertTrue(latestBaseFilesInRange.contains(fileId4));

allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, allBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId1));
assertTrue(allBaseFiles.contains(fileId2));
assertTrue(allBaseFiles.contains(fileId4));

// couldn't see fileId3 because clustering is not committed.
latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId1));
assertTrue(allBaseFiles.contains(fileId2));
assertTrue(allBaseFiles.contains(fileId4));

// couldn't see fileId3 because clustering is not committed.
latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFiles.size());
assertTrue(latestBaseFilesPerPartition.contains(fileId1));
assertTrue(latestBaseFilesPerPartition.contains(fileId2));
assertTrue(latestBaseFilesPerPartition.contains(fileId4));
}


// Generate Hoodie WriteStat For Given Partition
private List<HoodieWriteStat> buildWriteStats(HashMap<String, List<String>> partitionToFileIds, String commitTime) {
HashMap<String, List<Pair<String, Integer>>> maps = new HashMap<>();
for (String partition : partitionToFileIds.keySet()) {
List<Pair<String, Integer>> list = partitionToFileIds.get(partition).stream().map(fileId -> new ImmutablePair<String, Integer>(fileId, 0)).collect(Collectors.toList());
maps.put(partition, list);
}
return HoodieTestTable.generateHoodieWriteStatForPartition(maps, commitTime, false);
}

@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(Write
return testTableState;
}

private static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
public static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
String commitTime, boolean bootstrap) {
List<HoodieWriteStat> writeStats = new ArrayList<>();
for (Map.Entry<String, List<Pair<String, Integer>>> entry : partitionToFileIdMap.entrySet()) {
Expand Down