Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,9 +142,8 @@ public void cleanDanglingDeleteFiles(TableRuntime tableRuntime) {
if (currentSnapshot == null) {
return;
}
java.util.Optional<String> totalDeleteFiles =
java.util.Optional.ofNullable(
currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
Optional<String> totalDeleteFiles =
Optional.ofNullable(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) > 0) {
// clear dangling delete files
cleanDanglingDeleteFiles();
Expand Down Expand Up @@ -175,36 +175,37 @@ void expireSnapshots(long mustOlderThan) {
private void expireSnapshots(long olderThan, Set<String> exclude) {
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
final AtomicInteger deleteFiles = new AtomicInteger(0);
Set<String> parentDirectory = new HashSet<>();
Set<String> parentDirectories = new HashSet<>();
Set<String> expiredFiles = new HashSet<>();
table
.expireSnapshots()
.retainLast(1)
.expireOlderThan(olderThan)
.deleteWith(
file -> {
try {
if (exclude.isEmpty()) {
arcticFileIO().deleteFile(file);
} else {
String fileUriPath = TableFileUtil.getUriPath(file);
if (!exclude.contains(fileUriPath)
&& !exclude.contains(new Path(fileUriPath).getParent().toString())) {
arcticFileIO().deleteFile(file);
}
if (exclude.isEmpty()) {
expiredFiles.add(file);
} else {
String fileUriPath = TableFileUtil.getUriPath(file);
if (!exclude.contains(fileUriPath)
&& !exclude.contains(new Path(fileUriPath).getParent().toString())) {
expiredFiles.add(file);
}
parentDirectory.add(new Path(file).getParent().toString());
deleteFiles.incrementAndGet();
} catch (Throwable t) {
LOG.warn("failed to delete file " + file, t);
} finally {
toDeleteFiles.incrementAndGet();
}

parentDirectories.add(new Path(file).getParent().toString());
toDeleteFiles.incrementAndGet();
})
.cleanExpiredFiles(true)
.cleanExpiredFiles(
true) /* enable clean only for collecting the expired files, will delete them later */
.commit();

parentDirectory.forEach(
// try to batch delete files
int deletedFiles =
TableFileUtil.parallelDeleteFiles(
arcticFileIO(), expiredFiles, ThreadPools.getWorkerPool());

parentDirectories.forEach(
parent -> {
try {
TableFileUtil.deleteEmptyDirectory(arcticFileIO(), parent, exclude);
Expand All @@ -214,11 +215,14 @@ private void expireSnapshots(long olderThan, Set<String> exclude) {
}
});

LOG.info(
"to delete {} files in {}, success delete {} files",
toDeleteFiles.get(),
getTable().name(),
deleteFiles.get());
runWithCondition(
toDeleteFiles.get() > 0,
() ->
LOG.info(
"To delete {} files in {}, success delete {} files",
toDeleteFiles.get(),
getTable().name(),
deletedFiles));
}

@Override
Expand Down Expand Up @@ -297,21 +301,23 @@ protected void cleanContentFiles(long lastTime) {
// so acquire in advance
// to prevent repeated acquisition
Set<String> validFiles = orphanFileCleanNeedToExcludeFiles();
LOG.info("{} start clean content files of change store", table.name());
int deleteFilesCnt = clearInternalTableContentsFiles(lastTime, validFiles);
LOG.info("{} total delete {} files from change store", table.name(), deleteFilesCnt);
LOG.info("{} start cleaning orphan files in content", table.name());
clearInternalTableContentsFiles(lastTime, validFiles);
}

protected void cleanMetadata(long lastTime) {
LOG.info("{} start clean metadata files", table.name());
int deleteFilesCnt = clearInternalTableMetadata(lastTime);
LOG.info("{} total delete {} metadata files", table.name(), deleteFilesCnt);
clearInternalTableMetadata(lastTime);
}

protected void cleanDanglingDeleteFiles() {
LOG.info("{} start delete dangling delete files", table.name());
int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles();
LOG.info("{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt);
runWithCondition(
danglingDeleteFilesCnt > 0,
() ->
LOG.info(
"{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt));
}

protected long mustOlderThan(TableRuntime tableRuntime, long now) {
Expand Down Expand Up @@ -344,18 +350,28 @@ protected ArcticFileIO arcticFileIO() {
return (ArcticFileIO) table.io();
}

private int clearInternalTableContentsFiles(long lastTime, Set<String> exclude) {
private void clearInternalTableContentsFiles(long lastTime, Set<String> exclude) {
String dataLocation = table.location() + File.separator + DATA_FOLDER_NAME;
int slated = 0, deleted = 0;

try (ArcticFileIO io = arcticFileIO()) {
// listPrefix will not return the directory and the orphan file clean should clean the empty
// dir.
if (io.supportFileSystemOperations()) {
SupportsFileSystemOperations fio = io.asFileSystemIO();
return deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude);
Set<PathInfo> directories = new HashSet<>();
Set<String> filesToDelete =
deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude, directories);
slated = filesToDelete.size();
deleted = TableFileUtil.deleteFiles(io, filesToDelete);
/* delete empty directories */
deleteEmptyDirectories(fio, directories, lastTime, exclude);
} else if (io.supportPrefixOperations()) {
SupportsPrefixOperations pio = io.asPrefixFileIO();
return deleteInvalidFilesByPrefix(pio, dataLocation, lastTime, exclude);
Set<String> filesToDelete =
deleteInvalidFilesByPrefix(pio, dataLocation, lastTime, exclude);
slated = filesToDelete.size();
deleted = TableFileUtil.deleteFiles(io, filesToDelete);
} else {
LOG.warn(
String.format(
Expand All @@ -364,10 +380,19 @@ private int clearInternalTableContentsFiles(long lastTime, Set<String> exclude)
}
}

return 0;
final int finalCandidate = slated;
final int finalDeleted = deleted;
runWithCondition(
slated > 0,
() ->
LOG.info(
"{}: There were {} files slated for deletion and {} files were successfully deleted",
table.name(),
finalCandidate,
finalDeleted));
}

private int clearInternalTableMetadata(long lastTime) {
private void clearInternalTableMetadata(long lastTime) {
Set<String> validFiles = getValidMetadataFiles(table);
LOG.info("{} table getRuntime {} valid files", table.name(), validFiles.size());
Pattern excludeFileNameRegex = getExcludeFileNameRegex(table);
Expand All @@ -379,16 +404,26 @@ private int clearInternalTableMetadata(long lastTime) {
try (ArcticFileIO io = arcticFileIO()) {
if (io.supportPrefixOperations()) {
SupportsPrefixOperations pio = io.asPrefixFileIO();
return deleteInvalidMetadataFile(
pio, metadataLocation, lastTime, validFiles, excludeFileNameRegex);
Set<String> filesToDelete =
deleteInvalidMetadataFile(
pio, metadataLocation, lastTime, validFiles, excludeFileNameRegex);
int deleted = TableFileUtil.deleteFiles(io, filesToDelete);

runWithCondition(
!filesToDelete.isEmpty(),
() ->
LOG.info(
"{}: There were {} metadata files to be deleted and {} metadata files were successfully deleted",
table.name(),
filesToDelete.size(),
deleted));
} else {
LOG.warn(
String.format(
"Table %s doesn't support a fileIo with listDirectory or listPrefix, so skip clear files.",
table.name()));
}
}
return 0;
}

private int clearInternalTableDanglingDeleteFiles() {
Expand Down Expand Up @@ -473,50 +508,61 @@ public static long fetchLatestNonOptimizedSnapshotTime(Table table) {
return snapshot.map(Snapshot::timestampMillis).orElse(Long.MAX_VALUE);
}

private static int deleteInvalidFilesInFs(
SupportsFileSystemOperations fio, String location, long lastTime, Set<String> excludes) {
private Set<String> deleteInvalidFilesInFs(
SupportsFileSystemOperations fio,
String location,
long lastTime,
Set<String> excludes,
Set<PathInfo> directories) {
if (!fio.exists(location)) {
return 0;
return Collections.emptySet();
}

int deleteCount = 0;
Set<String> filesToDelete = new HashSet<>();
for (PathInfo p : fio.listDirectory(location)) {
String uriPath = TableFileUtil.getUriPath(p.location());
if (p.isDirectory()) {
int deleted = deleteInvalidFilesInFs(fio, p.location(), lastTime, excludes);
deleteCount += deleted;
if (fio.exists(p.location())
&& !p.location().endsWith(METADATA_FOLDER_NAME)
&& !p.location().endsWith(DATA_FOLDER_NAME)
&& p.createdAtMillis() < lastTime
&& fio.isEmptyDirectory(p.location())) {
TableFileUtil.deleteEmptyDirectory(fio, p.location(), excludes);
}
directories.add(p);
filesToDelete.addAll(
deleteInvalidFilesInFs(fio, p.location(), lastTime, excludes, directories));
} else {
String parentLocation = TableFileUtil.getParent(p.location());
String parentUriPath = TableFileUtil.getUriPath(parentLocation);
if (!excludes.contains(uriPath)
&& !excludes.contains(parentUriPath)
&& p.createdAtMillis() < lastTime) {
fio.deleteFile(p.location());
deleteCount += 1;
filesToDelete.add(p.location());
}
}
}
return deleteCount;
return filesToDelete;
}

private void deleteEmptyDirectories(
SupportsFileSystemOperations fio, Set<PathInfo> paths, long lastTime, Set<String> excludes) {
paths.forEach(
p -> {
if (fio.exists(p.location())
&& !p.location().endsWith(METADATA_FOLDER_NAME)
&& !p.location().endsWith(DATA_FOLDER_NAME)
&& p.createdAtMillis() < lastTime
&& fio.isEmptyDirectory(p.location())) {
TableFileUtil.deleteEmptyDirectory(fio, p.location(), excludes);
}
});
}

private static int deleteInvalidFilesByPrefix(
private Set<String> deleteInvalidFilesByPrefix(
SupportsPrefixOperations pio, String prefix, long lastTime, Set<String> excludes) {
int deleteCount = 0;
Set<String> filesToDelete = new HashSet<>();
for (FileInfo fileInfo : pio.listPrefix(prefix)) {
String uriPath = TableFileUtil.getUriPath(fileInfo.location());
if (!excludes.contains(uriPath) && fileInfo.createdAtMillis() < lastTime) {
pio.deleteFile(fileInfo.location());
deleteCount += 1;
filesToDelete.add(fileInfo.location());
}
}
return deleteCount;

return filesToDelete;
}

private static Set<String> getValidMetadataFiles(Table internalTable) {
Expand Down Expand Up @@ -575,24 +621,24 @@ private static Pattern getExcludeFileNameRegex(Table table) {
return null;
}

private static int deleteInvalidMetadataFile(
private Set<String> deleteInvalidMetadataFile(
SupportsPrefixOperations pio,
String location,
long lastTime,
Set<String> exclude,
Pattern excludeRegex) {
int count = 0;
Set<String> filesToDelete = new HashSet<>();
for (FileInfo fileInfo : pio.listPrefix(location)) {
String uriPath = TableFileUtil.getUriPath(fileInfo.location());
if (!exclude.contains(uriPath)
&& fileInfo.createdAtMillis() < lastTime
&& (excludeRegex == null
|| !excludeRegex.matcher(TableFileUtil.getFileName(fileInfo.location())).matches())) {
pio.deleteFile(fileInfo.location());
count += 1;
filesToDelete.add(fileInfo.location());
}
}
return count;

return filesToDelete;
}

private static String formatTime(long timestamp) {
Expand All @@ -617,9 +663,7 @@ CloseableIterable<FileEntry> fileScan(
}
long deleteFileCnt =
Long.parseLong(
snapshot
.summary()
.getOrDefault(org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0"));
snapshot.summary().getOrDefault(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0"));
CloseableIterable<DataFile> dataFiles =
CloseableIterable.transform(tasks, ContentScanTask::file);
CloseableIterable<FileScanTask> hasDeleteTask =
Expand Down Expand Up @@ -936,4 +980,10 @@ public Literal<Long> getTsBound() {
return tsBound;
}
}

private void runWithCondition(boolean condition, Runnable fun) {
if (condition) {
fun.run();
}
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/com/netease/arctic/io/ArcticFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;

import java.util.concurrent.Callable;
Expand Down Expand Up @@ -61,6 +62,20 @@ default SupportsPrefixOperations asPrefixFileIO() {
}
}

/** Determine if the fileIO supports bulk operations. */
default boolean supportBulkOperations() {
return false;
}

/** Return this fileIO as a {@link SupportsBulkOperations} if it is an instance of that type. */
default SupportsBulkOperations asBulkFileIO() {
if (supportBulkOperations()) {
return (SupportsBulkOperations) this;
} else {
throw new IllegalStateException("Doesn't support bulk operations");
}
}

/** Returns true if this tableIo is an {@link SupportsFileSystemOperations} */
default boolean supportFileSystemOperations() {
return false;
Expand Down
Loading