diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/fileio/ForwardingFileIo.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/fileio/ForwardingFileIo.java index b2bb59ee8e7c..214107bfcb40 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/fileio/ForwardingFileIo.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/fileio/ForwardingFileIo.java @@ -13,19 +13,27 @@ */ package io.trino.filesystem.fileio; +import com.google.common.collect.Iterables; import io.trino.filesystem.TrinoFileSystem; -import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; +import java.util.stream.Stream; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; public class ForwardingFileIo - implements FileIO + implements SupportsBulkOperations { + private static final int DELETE_BATCH_SIZE = 1000; + private static final int BATCH_DELETE_PATHS_MESSAGE_LIMIT = 5; + private final TrinoFileSystem fileSystem; public ForwardingFileIo(TrinoFileSystem fileSystem) @@ -61,4 +69,29 @@ public void deleteFile(String path) throw new UncheckedIOException("Failed to delete file: " + path, e); } } + + @Override + public void deleteFiles(Iterable pathsToDelete) + throws BulkDeletionFailureException + { + Iterable> partitions = Iterables.partition(pathsToDelete, DELETE_BATCH_SIZE); + partitions.forEach(this::deleteBatch); + } + + private void deleteBatch(List filesToDelete) + { + try { + fileSystem.deleteFiles(filesToDelete); + } + catch (IOException e) { + throw new UncheckedIOException( + "Failed to delete some or all of files: " + + Stream.concat( + filesToDelete.stream() + .limit(BATCH_DELETE_PATHS_MESSAGE_LIMIT), + filesToDelete.size() > BATCH_DELETE_PATHS_MESSAGE_LIMIT ? Stream.of("...") : Stream.of()) + .collect(joining(", ", "[", "]")), + e); + } + } }