Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,17 +120,23 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));

List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList());
Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
List<String> partitionsToDelete = new ArrayList<>();
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
// Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server
// (remote or local embedded), thus to reduce the risk of an OOM exception.
List<String> subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works when cleanerParallelism is smaller than the size of partitionsToClean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is also reasonable.

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

cleanOps.putAll(cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))));

partitionsToDelete.addAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList()));
}

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
boolean toDeletePartition = false;
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Expand Down Expand Up @@ -329,7 +329,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetain));
// add active files
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected Stream<FileSlice> getFileSlicesEligibleForClustering(String partition)
.collect(Collectors.toSet());
fgIdsInPendingCompactionLogCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()));

return hoodieTable.getSliceView().getLatestFileSlices(partition)
return hoodieTable.getSliceView().getLatestFileSlicesStateless(partition)
// file ids already in clustering are not eligible
.filter(slice -> !fgIdsInPendingCompactionLogCompactionAndClustering.contains(slice.getFileGroupId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);

List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
.getLatestFileSlicesStateless(partitionPath)
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering, instantRange))
.map(s -> {
List<HoodieLogFile> logFiles = s.getLogFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ protected Map<Pair<String, Path>, FileStatus[]> listPartitions(
return fileStatusMap;
}

/**
* Returns all files situated at the given partition.
*/
private FileStatus[] getAllFilesInPartition(String relativePartitionPath) throws IOException {
Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), relativePartitionPath);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = listPartition(partitionPath);
long endLsTs = System.currentTimeMillis();
LOG.debug("#files found in partition (" + relativePartitionPath + ") =" + statuses.length + ", Time taken ="
+ (endLsTs - beginLsTs));
return statuses;
}

/**
* Allows lazily loading the partitions if needed.
*
Expand All @@ -449,15 +462,7 @@ private void ensurePartitionLoadedCorrectly(String partition) {
// Not loaded yet
try {
LOG.info("Building file system view for partition (" + partitionPathStr + ")");

Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = listPartition(partitionPath);
long endLsTs = System.currentTimeMillis();
LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
+ (endLsTs - beginLsTs));
List<HoodieFileGroup> groups = addFilesToView(statuses);

List<HoodieFileGroup> groups = addFilesToView(getAllFilesInPartition(partitionPathStr));
if (groups.isEmpty()) {
storePartitionView(partitionPathStr, new ArrayList<>());
}
Expand Down Expand Up @@ -598,35 +603,49 @@ private FileSlice filterUncommittedLogs(FileSlice fileSlice) {
}

protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
return addBootstrapBaseFileIfPresent(fileGroup, this::getBootstrapBaseFile);
}

protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup, Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> bootstrapBaseFileMappingFunc) {
boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices()
.anyMatch(fs -> fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS));
if (hasBootstrapBaseFile) {
HoodieFileGroup newFileGroup = new HoodieFileGroup(fileGroup);
newFileGroup.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS))
.forEach(fs -> fs.setBaseFile(
addBootstrapBaseFileIfPresent(fs.getFileGroupId(), fs.getBaseFile().get())));
addBootstrapBaseFileIfPresent(fs.getFileGroupId(), fs.getBaseFile().get(), bootstrapBaseFileMappingFunc)));
return newFileGroup;
}
return fileGroup;
}

protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice) {
return addBootstrapBaseFileIfPresent(fileSlice, this::getBootstrapBaseFile);
}

protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice, Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> bootstrapBaseFileMappingFunc) {
if (fileSlice.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
FileSlice copy = new FileSlice(fileSlice);
copy.getBaseFile().ifPresent(dataFile -> {
Option<BootstrapBaseFileMapping> edf = getBootstrapBaseFile(copy.getFileGroupId());
edf.ifPresent(e -> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
bootstrapBaseFileMappingFunc.apply(copy.getFileGroupId()).ifPresent(e -> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
});
return copy;
}
return fileSlice;
}

protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGroupId, HoodieBaseFile baseFile) {
return addBootstrapBaseFileIfPresent(fileGroupId, baseFile, this::getBootstrapBaseFile);
}

protected HoodieBaseFile addBootstrapBaseFileIfPresent(
HoodieFileGroupId fileGroupId,
HoodieBaseFile baseFile,
Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> bootstrapBaseFileMappingFunc) {
if (baseFile.getCommitTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
HoodieBaseFile copy = new HoodieBaseFile(baseFile);
Option<BootstrapBaseFileMapping> edf = getBootstrapBaseFile(fileGroupId);
edf.ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
bootstrapBaseFileMappingFunc.apply(fileGroupId).ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
return copy;
}
return baseFile;
Expand Down Expand Up @@ -706,7 +725,6 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitio
public final Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
try {
readLock.lock();

List<String> formattedPartitionList = ensureAllPartitionsLoadedCorrectly();
return formattedPartitionList.stream().collect(Collectors.toMap(
Function.identity(),
Expand Down Expand Up @@ -824,6 +842,31 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
}
}

@Override
public final Stream<FileSlice> getLatestFileSlicesStateless(String partitionStr) {
String partition = formatPartitionKey(partitionStr);
if (isPartitionAvailableInStore(partition)) {
return getLatestFileSlices(partition);
} else {
try {
Stream<FileSlice> fileSliceStream = buildFileGroups(getAllFilesInPartition(partition), visibleCommitsAndCompactionTimeline, true).stream()
.filter(fg -> !isFileGroupReplaced(fg))
.map(HoodieFileGroup::getLatestFileSlice)
.filter(Option::isPresent).map(Option::get)
.flatMap(slice -> this.filterUncommittedFiles(slice, true));
if (bootstrapIndex.useIndex()) {
final Map<HoodieFileGroupId, BootstrapBaseFileMapping> bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
if (!bootstrapBaseFileMappings.isEmpty()) {
return fileSliceStream.map(fileSlice -> addBootstrapBaseFileIfPresent(fileSlice, fileGroupId -> Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
}
}
return fileSliceStream;
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch all files in partition " + partition, e);
}
}
}

/**
* Get Latest File Slice for a given fileId in a given partition.
*/
Expand Down Expand Up @@ -1014,6 +1057,39 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public final Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionStr) {
String partition = formatPartitionKey(partitionStr);
if (isPartitionAvailableInStore(partition)) {
return getAllFileGroups(partition);
} else {
try {
Stream<HoodieFileGroup> fileGroupStream = buildFileGroups(getAllFilesInPartition(partition), visibleCommitsAndCompactionTimeline, true).stream()
.filter(fg -> !isFileGroupReplaced(fg));
if (bootstrapIndex.useIndex()) {
final Map<HoodieFileGroupId, BootstrapBaseFileMapping> bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
if (!bootstrapBaseFileMappings.isEmpty()) {
return fileGroupStream.map(fileGroup -> addBootstrapBaseFileIfPresent(fileGroup, fileGroupId -> Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
}
}
return fileGroupStream;
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch all files in partition " + partition, e);
}
}
}

private Map<HoodieFileGroupId, BootstrapBaseFileMapping> getBootstrapBaseFileMappings(String partition) {
try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
LOG.info("Bootstrap Index available for partition " + partition);
List<BootstrapFileMapping> sourceFileMappings =
reader.getSourceFileMappingForPartition(partition);
return sourceFileMappings.stream()
.map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId()), s.getBootstrapFileStatus())).collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, s -> s));
}
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
Expand All @@ -1029,22 +1105,38 @@ private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String p

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
}
return Stream.empty();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
}
return Stream.empty();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String minCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
}
return Stream.empty();
}

@Override
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
String partition = formatPartitionKey(partitionPath);
if (hasReplacedFilesInPartition(partition)) {
return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
}
return Stream.empty();
}

@Override
Expand Down Expand Up @@ -1263,6 +1355,11 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingLogCompac
*/
protected abstract void removeReplacedFileIdsAtInstants(Set<String> instants);

/**
* Returns whether there are replaced files within the given partition.
*/
protected abstract boolean hasReplacedFilesInPartition(String partitionPath);

/**
* Track instant time for file groups replaced.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ protected void removeReplacedFileIdsAtInstants(Set<String> instants) {
fgIdToReplaceInstants.entrySet().removeIf(entry -> instants.contains(entry.getValue().getTimestamp()));
}

@Override
protected boolean hasReplacedFilesInPartition(String partitionPath) {
return fgIdToReplaceInstants.keySet().stream().anyMatch(fg -> fg.getPartitionPath().equals(partitionPath));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also format the partitionPath to trim the / if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

@Override
protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId fileGroupId) {
return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
return execute(partitionPath, preferredView::getLatestFileSlices, secondaryView::getLatestFileSlices);
}

@Override
public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
return execute(partitionPath, preferredView::getLatestFileSlicesStateless, secondaryView::getLatestFileSlicesStateless);
}

@Override
public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
return execute(partitionPath, preferredView::getLatestUnCompactedFileSlices,
Expand Down Expand Up @@ -222,6 +227,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroupsStateless, secondaryView::getAllFileGroupsStateless);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not add APIs just for tesing purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But without this method, it seems difficult to confirm whether the partition metadata has been cleaned up.


@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
Expand Down
Loading