diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index ae9f181e02..e417fe8786 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -62,6 +62,7 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,9 +142,8 @@ public void cleanDanglingDeleteFiles(TableRuntime tableRuntime) { if (currentSnapshot == null) { return; } - java.util.Optional totalDeleteFiles = - java.util.Optional.ofNullable( - currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP)); + Optional totalDeleteFiles = + Optional.ofNullable(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP)); if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get()) > 0) { // clear dangling delete files cleanDanglingDeleteFiles(); @@ -175,36 +175,37 @@ void expireSnapshots(long mustOlderThan) { private void expireSnapshots(long olderThan, Set exclude) { LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); - final AtomicInteger deleteFiles = new AtomicInteger(0); - Set parentDirectory = new HashSet<>(); + Set parentDirectories = new HashSet<>(); + Set expiredFiles = new HashSet<>(); table .expireSnapshots() .retainLast(1) .expireOlderThan(olderThan) .deleteWith( file -> { - try { - if (exclude.isEmpty()) { - arcticFileIO().deleteFile(file); - } else { - String fileUriPath = TableFileUtil.getUriPath(file); - if (!exclude.contains(fileUriPath) - && !exclude.contains(new Path(fileUriPath).getParent().toString())) { - arcticFileIO().deleteFile(file); - } + if (exclude.isEmpty()) { + expiredFiles.add(file); + } else { + String fileUriPath = TableFileUtil.getUriPath(file); + if (!exclude.contains(fileUriPath) + && !exclude.contains(new Path(fileUriPath).getParent().toString())) { + expiredFiles.add(file); } - parentDirectory.add(new Path(file).getParent().toString()); - deleteFiles.incrementAndGet(); - } catch (Throwable t) { - LOG.warn("failed to delete file " + file, t); - } finally { - toDeleteFiles.incrementAndGet(); } + + parentDirectories.add(new Path(file).getParent().toString()); + toDeleteFiles.incrementAndGet(); }) - .cleanExpiredFiles(true) + .cleanExpiredFiles( + true) /* enable clean only for collecting the expired files, will delete them later */ .commit(); - parentDirectory.forEach( + // try to batch delete files + int deletedFiles = + TableFileUtil.parallelDeleteFiles( + arcticFileIO(), expiredFiles, ThreadPools.getWorkerPool()); + + parentDirectories.forEach( parent -> { try { TableFileUtil.deleteEmptyDirectory(arcticFileIO(), parent, exclude); @@ -214,11 +215,14 @@ private void expireSnapshots(long olderThan, Set exclude) { } }); - LOG.info( - "to delete {} files in {}, success delete {} files", - toDeleteFiles.get(), - getTable().name(), - deleteFiles.get()); + runWithCondition( + toDeleteFiles.get() > 0, + () -> + LOG.info( + "To delete {} files in {}, success delete {} files", + toDeleteFiles.get(), + getTable().name(), + deletedFiles)); } @Override @@ -297,21 +301,23 @@ protected void cleanContentFiles(long lastTime) { // so acquire in advance // to prevent repeated acquisition Set validFiles = orphanFileCleanNeedToExcludeFiles(); - LOG.info("{} start clean content files of change store", table.name()); - int deleteFilesCnt = clearInternalTableContentsFiles(lastTime, validFiles); - LOG.info("{} total delete {} files from change store", table.name(), deleteFilesCnt); + LOG.info("{} start cleaning orphan files in content", table.name()); + clearInternalTableContentsFiles(lastTime, validFiles); } protected void cleanMetadata(long lastTime) { LOG.info("{} start clean metadata files", table.name()); - int deleteFilesCnt = clearInternalTableMetadata(lastTime); - LOG.info("{} total delete {} metadata files", table.name(), deleteFilesCnt); + clearInternalTableMetadata(lastTime); } protected void cleanDanglingDeleteFiles() { LOG.info("{} start delete dangling delete files", table.name()); int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles(); - LOG.info("{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt); + runWithCondition( + danglingDeleteFilesCnt > 0, + () -> + LOG.info( + "{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt)); } protected long mustOlderThan(TableRuntime tableRuntime, long now) { @@ -344,18 +350,28 @@ protected ArcticFileIO arcticFileIO() { return (ArcticFileIO) table.io(); } - private int clearInternalTableContentsFiles(long lastTime, Set exclude) { + private void clearInternalTableContentsFiles(long lastTime, Set exclude) { String dataLocation = table.location() + File.separator + DATA_FOLDER_NAME; + int slated = 0, deleted = 0; try (ArcticFileIO io = arcticFileIO()) { // listPrefix will not return the directory and the orphan file clean should clean the empty // dir. if (io.supportFileSystemOperations()) { SupportsFileSystemOperations fio = io.asFileSystemIO(); - return deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude); + Set directories = new HashSet<>(); + Set filesToDelete = + deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude, directories); + slated = filesToDelete.size(); + deleted = TableFileUtil.deleteFiles(io, filesToDelete); + /* delete empty directories */ + deleteEmptyDirectories(fio, directories, lastTime, exclude); } else if (io.supportPrefixOperations()) { SupportsPrefixOperations pio = io.asPrefixFileIO(); - return deleteInvalidFilesByPrefix(pio, dataLocation, lastTime, exclude); + Set filesToDelete = + deleteInvalidFilesByPrefix(pio, dataLocation, lastTime, exclude); + slated = filesToDelete.size(); + deleted = TableFileUtil.deleteFiles(io, filesToDelete); } else { LOG.warn( String.format( @@ -364,10 +380,19 @@ private int clearInternalTableContentsFiles(long lastTime, Set exclude) } } - return 0; + final int finalCandidate = slated; + final int finalDeleted = deleted; + runWithCondition( + slated > 0, + () -> + LOG.info( + "{}: There were {} files slated for deletion and {} files were successfully deleted", + table.name(), + finalCandidate, + finalDeleted)); } - private int clearInternalTableMetadata(long lastTime) { + private void clearInternalTableMetadata(long lastTime) { Set validFiles = getValidMetadataFiles(table); LOG.info("{} table getRuntime {} valid files", table.name(), validFiles.size()); Pattern excludeFileNameRegex = getExcludeFileNameRegex(table); @@ -379,8 +404,19 @@ private int clearInternalTableMetadata(long lastTime) { try (ArcticFileIO io = arcticFileIO()) { if (io.supportPrefixOperations()) { SupportsPrefixOperations pio = io.asPrefixFileIO(); - return deleteInvalidMetadataFile( - pio, metadataLocation, lastTime, validFiles, excludeFileNameRegex); + Set filesToDelete = + deleteInvalidMetadataFile( + pio, metadataLocation, lastTime, validFiles, excludeFileNameRegex); + int deleted = TableFileUtil.deleteFiles(io, filesToDelete); + + runWithCondition( + !filesToDelete.isEmpty(), + () -> + LOG.info( + "{}: There were {} metadata files to be deleted and {} metadata files were successfully deleted", + table.name(), + filesToDelete.size(), + deleted)); } else { LOG.warn( String.format( @@ -388,7 +424,6 @@ private int clearInternalTableMetadata(long lastTime) { table.name())); } } - return 0; } private int clearInternalTableDanglingDeleteFiles() { @@ -473,50 +508,61 @@ public static long fetchLatestNonOptimizedSnapshotTime(Table table) { return snapshot.map(Snapshot::timestampMillis).orElse(Long.MAX_VALUE); } - private static int deleteInvalidFilesInFs( - SupportsFileSystemOperations fio, String location, long lastTime, Set excludes) { + private Set deleteInvalidFilesInFs( + SupportsFileSystemOperations fio, + String location, + long lastTime, + Set excludes, + Set directories) { if (!fio.exists(location)) { - return 0; + return Collections.emptySet(); } - int deleteCount = 0; + Set filesToDelete = new HashSet<>(); for (PathInfo p : fio.listDirectory(location)) { String uriPath = TableFileUtil.getUriPath(p.location()); if (p.isDirectory()) { - int deleted = deleteInvalidFilesInFs(fio, p.location(), lastTime, excludes); - deleteCount += deleted; - if (fio.exists(p.location()) - && !p.location().endsWith(METADATA_FOLDER_NAME) - && !p.location().endsWith(DATA_FOLDER_NAME) - && p.createdAtMillis() < lastTime - && fio.isEmptyDirectory(p.location())) { - TableFileUtil.deleteEmptyDirectory(fio, p.location(), excludes); - } + directories.add(p); + filesToDelete.addAll( + deleteInvalidFilesInFs(fio, p.location(), lastTime, excludes, directories)); } else { String parentLocation = TableFileUtil.getParent(p.location()); String parentUriPath = TableFileUtil.getUriPath(parentLocation); if (!excludes.contains(uriPath) && !excludes.contains(parentUriPath) && p.createdAtMillis() < lastTime) { - fio.deleteFile(p.location()); - deleteCount += 1; + filesToDelete.add(p.location()); } } } - return deleteCount; + return filesToDelete; + } + + private void deleteEmptyDirectories( + SupportsFileSystemOperations fio, Set paths, long lastTime, Set excludes) { + paths.forEach( + p -> { + if (fio.exists(p.location()) + && !p.location().endsWith(METADATA_FOLDER_NAME) + && !p.location().endsWith(DATA_FOLDER_NAME) + && p.createdAtMillis() < lastTime + && fio.isEmptyDirectory(p.location())) { + TableFileUtil.deleteEmptyDirectory(fio, p.location(), excludes); + } + }); } - private static int deleteInvalidFilesByPrefix( + private Set deleteInvalidFilesByPrefix( SupportsPrefixOperations pio, String prefix, long lastTime, Set excludes) { - int deleteCount = 0; + Set filesToDelete = new HashSet<>(); for (FileInfo fileInfo : pio.listPrefix(prefix)) { String uriPath = TableFileUtil.getUriPath(fileInfo.location()); if (!excludes.contains(uriPath) && fileInfo.createdAtMillis() < lastTime) { - pio.deleteFile(fileInfo.location()); - deleteCount += 1; + filesToDelete.add(fileInfo.location()); } } - return deleteCount; + + return filesToDelete; } private static Set getValidMetadataFiles(Table internalTable) { @@ -575,24 +621,24 @@ private static Pattern getExcludeFileNameRegex(Table table) { return null; } - private static int deleteInvalidMetadataFile( + private Set deleteInvalidMetadataFile( SupportsPrefixOperations pio, String location, long lastTime, Set exclude, Pattern excludeRegex) { - int count = 0; + Set filesToDelete = new HashSet<>(); for (FileInfo fileInfo : pio.listPrefix(location)) { String uriPath = TableFileUtil.getUriPath(fileInfo.location()); if (!exclude.contains(uriPath) && fileInfo.createdAtMillis() < lastTime && (excludeRegex == null || !excludeRegex.matcher(TableFileUtil.getFileName(fileInfo.location())).matches())) { - pio.deleteFile(fileInfo.location()); - count += 1; + filesToDelete.add(fileInfo.location()); } } - return count; + + return filesToDelete; } private static String formatTime(long timestamp) { @@ -617,9 +663,7 @@ CloseableIterable fileScan( } long deleteFileCnt = Long.parseLong( - snapshot - .summary() - .getOrDefault(org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")); + snapshot.summary().getOrDefault(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")); CloseableIterable dataFiles = CloseableIterable.transform(tasks, ContentScanTask::file); CloseableIterable hasDeleteTask = @@ -936,4 +980,10 @@ public Literal getTsBound() { return tsBound; } } + + private void runWithCondition(boolean condition, Runnable fun) { + if (condition) { + fun.run(); + } + } } diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java index 3f36778553..c3b97deec9 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java @@ -20,6 +20,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; import java.util.concurrent.Callable; @@ -61,6 +62,20 @@ default SupportsPrefixOperations asPrefixFileIO() { } } + /** Determine if the fileIO supports bulk operations. */ + default boolean supportBulkOperations() { + return false; + } + + /** Return this fileIO as a {@link SupportsBulkOperations} if it is an instance of that type. */ + default SupportsBulkOperations asBulkFileIO() { + if (supportBulkOperations()) { + return (SupportsBulkOperations) this; + } else { + throw new IllegalStateException("Doesn't support bulk operations"); + } + } + /** Returns true if this tableIo is an {@link SupportsFileSystemOperations} */ default boolean supportFileSystemOperations() { return false; diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java index 201845ee4f..78a1507b8a 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIOAdapter.java @@ -21,6 +21,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -67,6 +68,17 @@ public SupportsPrefixOperations asPrefixFileIO() { return (SupportsPrefixOperations) io; } + @Override + public boolean supportBulkOperations() { + return io instanceof SupportsBulkOperations; + } + + @Override + public SupportsBulkOperations asBulkFileIO() { + Preconditions.checkArgument(supportBulkOperations()); + return (SupportsBulkOperations) io; + } + @Override public boolean supportFileSystemOperations() { return io instanceof ArcticFileIO && ((ArcticFileIO) io).supportFileSystemOperations(); diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 6f11a23fb1..26297b8da9 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -234,11 +235,25 @@ public void deletePrefix(String prefix) { }); } + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + tableMetaStore.doAs( + () -> { + super.deleteFiles(pathsToDelete); + return null; + }); + } + @Override public boolean supportPrefixOperations() { return true; } + @Override + public boolean supportBulkOperations() { + return true; + } + @Override public boolean supportFileSystemOperations() { return true; diff --git a/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java b/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java index 84053d28a3..4ed345539b 100644 --- a/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/TableFileUtil.java @@ -20,12 +20,16 @@ import com.netease.arctic.io.ArcticFileIO; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.net.URI; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; public class TableFileUtil { private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class); @@ -62,7 +66,7 @@ public static String getFileDir(String filePath) { public static void deleteEmptyDirectory( ArcticFileIO io, String directoryPath, Set exclude) { if (!io.exists(directoryPath)) { - LOG.warn("The target directory {} does not exist or has been deleted", directoryPath); + LOG.debug("The target directory {} does not exist or has been deleted", directoryPath); return; } String parent = new Path(directoryPath).getParent().toString(); @@ -80,6 +84,82 @@ public static void deleteEmptyDirectory( } } + /** + * Helper to delete files. Bulk deletion is used if possible. + * + * @param io arctic file io + * @param files files to delete + * @param workPool executor pool. Only applicable for non-bulk FileIO + * @return deleted file count + */ + public static int deleteFiles(ArcticFileIO io, Set files, ExecutorService workPool) { + if (files == null || files.isEmpty()) { + return 0; + } + + AtomicInteger failedFileCnt = new AtomicInteger(0); + if (io.supportBulkOperations()) { + try { + io.asBulkFileIO().deleteFiles(files); + } catch (BulkDeletionFailureException e) { + failedFileCnt.set(e.numberFailedObjects()); + LOG.warn("Failed to bulk delete {} files", e.numberFailedObjects(), e); + } catch (RuntimeException e) { + failedFileCnt.set(files.size()); + LOG.warn("Failed to bulk delete files", e); + } + } else { + if (workPool != null) { + Tasks.foreach(files) + .executeWith(workPool) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> { + failedFileCnt.addAndGet(1); + LOG.warn("Failed to delete file {}", file, exc); + }) + .run(io::deleteFile); + } else { + files.forEach( + f -> { + try { + io.deleteFile(f); + } catch (RuntimeException e) { + failedFileCnt.addAndGet(1); + LOG.warn("Failed to delete file {}", f, e); + } + }); + } + } + + return files.size() - failedFileCnt.get(); + } + + /** + * Helper to delete files sequentially + * + * @param io arctic file io + * @param files to deleted files + * @return deleted file count + */ + public static int deleteFiles(ArcticFileIO io, Set files) { + return deleteFiles(io, files, null); + } + + /** + * Helper to delete files in parallel + * + * @param io arctic file io + * @param files to deleted files + * @param workPool executor pool + * @return deleted file count + */ + public static int parallelDeleteFiles( + ArcticFileIO io, Set files, ExecutorService workPool) { + return deleteFiles(io, files, workPool); + } + /** * Get the file path after move file to target directory *