From f9cdfe50badcebb2e47229234920321044c24b1d Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 25 Jan 2023 15:10:39 -0600 Subject: [PATCH 01/12] API, Spark: Use Bulk Delete Methods instead of Single Deletes when Available Previously deletes were handled by a per Action execution service that would be used to parallelize single deletes. In this PR we move the responsibility of performing the deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations. This deprecates all methods which used to be used for doing single deletes as well as passing executor services to Actions which delete many files. --- .palantir/revapi.yml | 4 ++ .../iceberg/actions/DeleteReachableFiles.java | 19 ++++++- .../apache/iceberg/hadoop/HadoopFileIO.java | 56 ++++++++++++++++++- .../spark/actions/BaseSparkAction.java | 55 ++++++++++++++++++ .../DeleteReachableFilesSparkAction.java | 34 ++++++++++- .../TestDeleteReachableFilesAction.java | 56 ------------------- 6 files changed, 163 insertions(+), 61 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 781cf369dff9..1880bc582184 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -66,6 +66,10 @@ acceptedBreaks: old: "method void org.apache.iceberg.io.DataWriter::add(T)" justification: "Removing deprecated method" "1.1.0": + org.apache.iceberg:iceberg-api: + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.actions.DeleteReachableFiles org.apache.iceberg.actions.DeleteReachableFiles::deleteBulkWith(java.util.function.Consumer>)" + justification: "Adds new API for bulk deletes, no old apis are broken." org.apache.iceberg:iceberg-core: - code: "java.class.removed" old: "class org.apache.iceberg.rest.HTTPClientFactory" diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java index aa15ded71450..b1a59942b43e 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java @@ -40,17 +40,32 @@ public interface DeleteReachableFiles * @param deleteFunc a function that will be called to delete files. The function accepts path to * file as an argument. * @return this for method chaining + * @deprecated Deletes are now performed in bulk see {@link #deleteBulkWith(Consumer)}. This + * method will only be used if the FileIO does not implement {@link + * org.apache.iceberg.io.SupportsBulkOperations} */ + @Deprecated DeleteReachableFiles deleteWith(Consumer deleteFunc); /** - * Passes an alternative executor service that will be used for files removal. + * Passes an alternative delete implementation that will be used for files. * - *

If this method is not called, files will be deleted in the current thread. + * @param deleteFunc a function that will be called to delete files. The function accepts paths to + * files as an argument. + * @return this for method chaining + */ + DeleteReachableFiles deleteBulkWith(Consumer> deleteFunc); + + /** + * Passes an alternative executor service that will be used for files removal. * * @param executorService the service to use * @return this for method chaining + * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO + * specific parallelism controls to adjust bulk delete concurrency within that api. If this + * method is not called, files will be deleted in the current thread. */ + @Deprecated DeleteReachableFiles executeDeleteWith(ExecutorService executorService); /** diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 8f34994d6374..207207e78c1d 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -18,27 +18,42 @@ */ package org.apache.iceberg.hadoop; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations { +public class HadoopFileIO + implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete_file_parallelism"; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; private SerializableSupplier hadoopConf; private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); @@ -149,6 +164,45 @@ public void deletePrefix(String prefix) { } } + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + AtomicInteger failureCount = new AtomicInteger(0); + Tasks.foreach(pathsToDelete) + .executeWith(executorService()) + .retry(3) + .stopRetryOn(FileNotFoundException.class) + .onFailure( + (f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); + }) + .run(this::deleteFile); + + if (failureCount.get() != 0) { + throw new BulkDeletionFailureException(failureCount.get()); + } + } + + private int deleteThreads() { + return conf() + .getInt( + DELETE_FILE_PARALLELISM, + Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE); + } + + private ExecutorService executorService() { + if (executorService == null) { + synchronized (HadoopFileIO.class) { + if (executorService == null) { + executorService = + ThreadPools.newWorkerPool("iceberg-hadoopfileio-delete", deleteThreads()); + } + } + } + + return executorService; + } + /** * This class is a simple adaptor to allow for using Hadoop's RemoteIterator as an Iterator. * diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index cdd80040fa9e..0c151800bdce 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.iceberg.AllManifestsTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; @@ -46,12 +47,14 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; @@ -85,6 +88,7 @@ abstract class BaseSparkAction { private static final Logger LOG = LoggerFactory.getLogger(BaseSparkAction.class); private static final AtomicInteger JOB_COUNTER = new AtomicInteger(); private static final int DELETE_NUM_RETRIES = 3; + private static final int DELETE_GROUP_SIZE = 100000; private final SparkSession spark; private final JavaSparkContext sparkContext; @@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles( + Consumer> bulkDeleteFunc, Iterator files) { + DeleteSummary summary = new DeleteSummary(); + Iterator> groupedIterator = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(groupedIterator) + .suppressFailureWhenFinished() + .run( + fileList -> { + Map> filesByType = + fileList.stream().collect(Collectors.groupingBy(FileInfo::getType)); + filesByType.entrySet().stream() + .forEach( + entry -> { + List pathsToDelete = + entry.getValue().stream() + .map(FileInfo::getPath) + .collect(Collectors.toList()); + int failures = 0; + try { + bulkDeleteFunc.accept(pathsToDelete); + } catch (BulkDeletionFailureException bulkDeletionFailureException) { + failures = bulkDeletionFailureException.numberFailedObjects(); + } + summary.deletedFiles(entry.getKey(), pathsToDelete.size() - failures); + }); + }); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); + + return summary; + } + static class DeleteSummary { private final AtomicLong dataFilesCount = new AtomicLong(0L); private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L); @@ -261,6 +298,24 @@ static class DeleteSummary { private final AtomicLong manifestListsCount = new AtomicLong(0L); private final AtomicLong otherFilesCount = new AtomicLong(0L); + public void deletedFiles(String type, int numFiles) { + if (FileContent.DATA.name().equalsIgnoreCase(type)) { + dataFilesCount.addAndGet(numFiles); + } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) { + positionDeleteFilesCount.addAndGet(numFiles); + } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) { + equalityDeleteFilesCount.addAndGet(numFiles); + } else if (MANIFEST.equalsIgnoreCase(type)) { + manifestsCount.addAndGet(numFiles); + } else if (MANIFEST_LIST.equalsIgnoreCase(type)) { + manifestListsCount.addAndGet(numFiles); + } else if (OTHERS.equalsIgnoreCase(type)) { + otherFilesCount.addAndGet(numFiles); + } else { + throw new ValidationException("Illegal file type: %s", type); + } + } + public void deletedFile(String path, String type) { if (FileContent.DATA.name().equalsIgnoreCase(type)) { dataFilesCount.incrementAndGet(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index 0f01afa287bb..b757a4592ac3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -32,6 +32,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.PropertyUtil; @@ -54,6 +55,8 @@ public class DeleteReachableFilesSparkAction private static final Logger LOG = LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class); private final String metadataFileLocation; + + @Deprecated private final Consumer defaultDelete = new Consumer() { @Override @@ -62,7 +65,19 @@ public void accept(String file) { } }; - private Consumer deleteFunc = defaultDelete; + @Deprecated private Consumer deleteFunc = defaultDelete; + + private final Consumer> defaultBulkDelete = + new Consumer>() { + @Override + public void accept(Iterable paths) { + Preconditions.checkArgument( + io instanceof SupportsBulkOperations, "FileIO {} does not support bulk deletes", io); + ((SupportsBulkOperations) io).deleteFiles(paths); + } + }; + + private Consumer> bulkDeleteFunc = defaultBulkDelete; private ExecutorService deleteExecutorService = null; private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf()); @@ -94,6 +109,12 @@ public DeleteReachableFilesSparkAction executeDeleteWith(ExecutorService executo return this; } + @Override + public DeleteReachableFiles deleteBulkWith(Consumer> newBulkDeleteFunc) { + this.bulkDeleteFunc = newBulkDeleteFunc; + return this; + } + @Override public Result execute() { Preconditions.checkArgument(io != null, "File IO cannot be null"); @@ -132,7 +153,16 @@ private Dataset reachableFileDS(TableMetadata metadata) { } private DeleteReachableFiles.Result deleteFiles(Iterator files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (io instanceof SupportsBulkOperations) { + LOG.info("Triggering Bulk Delete Operations"); + + summary = deleteFiles(bulkDeleteFunc, files); + } else { + LOG.warn("Warning falling back to non-bulk deletes"); + summary = deleteFiles(deleteExecutorService, deleteFunc, files); + } + LOG.info("Deleted {} total files", summary.totalFilesCount()); return new BaseDeleteReachableFilesActionResult( diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java index 154e940519d1..4f95a73bd729 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java @@ -21,10 +21,6 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import java.io.File; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; @@ -44,11 +40,8 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -159,55 +152,6 @@ private void checkRemoveFilesResults( results.deletedOtherFilesCount()); } - @Test - public void dataFilesCleanupWithParallelTasks() { - table.newFastAppend().appendFile(FILE_A).commit(); - - table.newFastAppend().appendFile(FILE_B).commit(); - - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); - - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); - - Set deletedFiles = ConcurrentHashMap.newKeySet(); - Set deleteThreads = ConcurrentHashMap.newKeySet(); - AtomicInteger deleteThreadsIndex = new AtomicInteger(0); - - DeleteReachableFiles.Result result = - sparkActions() - .deleteReachableFiles(metadataLocation(table)) - .io(table.io()) - .executeDeleteWith( - Executors.newFixedThreadPool( - 4, - runnable -> { - Thread thread = new Thread(runnable); - thread.setName("remove-files-" + deleteThreadsIndex.getAndIncrement()); - thread.setDaemon( - true); // daemon threads will be terminated abruptly when the JVM exits - return thread; - })) - .deleteWith( - s -> { - deleteThreads.add(Thread.currentThread().getName()); - deletedFiles.add(s); - }) - .execute(); - - // Verifies that the delete methods ran in the threads created by the provided ExecutorService - // ThreadFactory - Assert.assertEquals( - deleteThreads, - Sets.newHashSet("remove-files-0", "remove-files-1", "remove-files-2", "remove-files-3")); - - Lists.newArrayList(FILE_A, FILE_B, FILE_C, FILE_D) - .forEach( - file -> - Assert.assertTrue( - "FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()))); - checkRemoveFilesResults(4L, 0, 0, 6L, 4L, 6, result); - } - @Test public void testWithExpiringDanglingStageCommit() { table.location(); From cf89f8c5fe08562358db0aaa8911bac673ac79f0 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 25 Jan 2023 17:24:43 -0600 Subject: [PATCH 02/12] Bulk delete with Orphan Files and Expire Snapshots --- .palantir/revapi.yml | 6 ++ .../iceberg/actions/DeleteOrphanFiles.java | 9 ++ .../iceberg/actions/DeleteReachableFiles.java | 3 +- .../iceberg/actions/ExpireSnapshots.java | 9 ++ .../actions/DeleteOrphanFilesSparkAction.java | 44 ++++++++-- .../DeleteReachableFilesSparkAction.java | 3 +- .../actions/ExpireSnapshotsSparkAction.java | 30 ++++++- .../procedures/ExpireSnapshotsProcedure.java | 10 +++ .../RemoveOrphanFilesProcedure.java | 9 ++ .../actions/TestExpireSnapshotsAction.java | 86 ++++--------------- 10 files changed, 127 insertions(+), 82 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 1880bc582184..0bc16a710036 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -67,9 +67,15 @@ acceptedBreaks: justification: "Removing deprecated method" "1.1.0": org.apache.iceberg:iceberg-api: + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.actions.DeleteOrphanFiles org.apache.iceberg.actions.DeleteOrphanFiles::deleteBulkWith(java.util.function.Consumer>)" + justification: "New Apis to allow for Bulk Deletion" - code: "java.method.addedToInterface" new: "method org.apache.iceberg.actions.DeleteReachableFiles org.apache.iceberg.actions.DeleteReachableFiles::deleteBulkWith(java.util.function.Consumer>)" justification: "Adds new API for bulk deletes, no old apis are broken." + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.actions.ExpireSnapshots org.apache.iceberg.actions.ExpireSnapshots::deleteBulkWith(java.util.function.Consumer>)" + justification: "New Apis to allow for Bulk Deletion" org.apache.iceberg:iceberg-core: - code: "java.class.removed" old: "class org.apache.iceberg.rest.HTTPClientFactory" diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 0e00eb67b217..e78922d5444e 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -67,7 +67,11 @@ public interface DeleteOrphanFiles extends Action deleteFunc); /** @@ -80,9 +84,14 @@ public interface DeleteOrphanFiles extends Action> deleteFunc); + /** * Passes a prefix mismatch mode that determines how this action should handle situations when the * metadata references files that match listed/provided files except for authority/scheme. diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java index b1a59942b43e..6ff2c4b008f8 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java @@ -62,8 +62,7 @@ public interface DeleteReachableFiles * @param executorService the service to use * @return this for method chaining * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO - * specific parallelism controls to adjust bulk delete concurrency within that api. If this - * method is not called, files will be deleted in the current thread. + * specific parallelism controls to adjust bulk delete concurrency within that api. */ @Deprecated DeleteReachableFiles executeDeleteWith(ExecutorService executorService); diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 2b1924c4d720..53b5d5af3e54 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -76,7 +76,11 @@ public interface ExpireSnapshots extends Action deleteFunc); /** @@ -90,9 +94,14 @@ public interface ExpireSnapshots extends Action> deleteFunc); + /** The action result that contains a summary of the execution. */ interface Result { /** Returns the number of deleted data files. */ diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 1abd2107ed7f..8ddb79402afb 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -48,6 +48,8 @@ import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -119,6 +121,20 @@ public void accept(String file) { } }; + private final Consumer> defaultBulkDelete = + new Consumer>() { + @Override + public void accept(Iterable paths) { + Preconditions.checkArgument( + table.io() instanceof SupportsBulkOperations, + "FileIO {} does not support bulk deletes", + table.io()); + ((SupportsBulkOperations) table.io()).deleteFiles(paths); + } + }; + + private Consumer> bulkDeleteFunc = defaultBulkDelete; + private Map equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT); private Map equalAuthorities = Collections.emptyMap(); private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR; @@ -152,6 +168,12 @@ public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorSe return this; } + @Override + public DeleteOrphanFiles deleteBulkWith(Consumer> newBulkDeleteFunc) { + this.bulkDeleteFunc = newBulkDeleteFunc; + return this; + } + @Override public DeleteOrphanFilesSparkAction prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) { this.prefixMismatchMode = newPrefixMismatchMode; @@ -246,12 +268,22 @@ private DeleteOrphanFiles.Result doExecute() { List orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + if (table.io() instanceof SupportsBulkOperations) { + try { + bulkDeleteFunc.accept(orphanFiles); + } catch (BulkDeletionFailureException bulkDeletionFailureException) { + LOG.warn( + "Bulk delete failed to remove {} files", + bulkDeletionFailureException.numberFailedObjects()); + } + } else { + Tasks.foreach(orphanFiles) + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) + .run(deleteFunc::accept); + } return new BaseDeleteOrphanFilesActionResult(orphanFiles); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index b757a4592ac3..c5c6c29ecd04 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -65,7 +65,7 @@ public void accept(String file) { } }; - @Deprecated private Consumer deleteFunc = defaultDelete; + private Consumer deleteFunc = defaultDelete; private final Consumer> defaultBulkDelete = new Consumer>() { @@ -156,7 +156,6 @@ private DeleteReachableFiles.Result deleteFiles(Iterator files) { DeleteSummary summary; if (io instanceof SupportsBulkOperations) { LOG.info("Triggering Bulk Delete Operations"); - summary = deleteFiles(bulkDeleteFunc, files); } else { LOG.warn("Warning falling back to non-bulk deletes"); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index d9af48c221f1..c5312543717c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -35,6 +35,7 @@ import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult; import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -86,6 +87,19 @@ public void accept(String file) { private Long expireOlderThanValue = null; private Integer retainLastValue = null; private Consumer deleteFunc = defaultDelete; + private final Consumer> defaultBulkDelete = + new Consumer>() { + @Override + public void accept(Iterable paths) { + Preconditions.checkArgument( + ops.io() instanceof SupportsBulkOperations, + "FileIO {} does not support bulk deletes", + ops.io()); + ((SupportsBulkOperations) ops.io()).deleteFiles(paths); + } + }; + + private Consumer> bulkDeleteFunc = defaultBulkDelete; private ExecutorService deleteExecutorService = null; private Dataset expiredFileDS = null; @@ -110,6 +124,12 @@ public ExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorServ return this; } + @Override + public ExpireSnapshots deleteBulkWith(Consumer> newBulkDeleteFunc) { + this.bulkDeleteFunc = newBulkDeleteFunc; + return this; + } + @Override public ExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) { expiredSnapshotIds.add(snapshotId); @@ -265,7 +285,15 @@ private Set findExpiredSnapshotIds( } private ExpireSnapshots.Result deleteFiles(Iterator files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (ops.io() instanceof SupportsBulkOperations) { + LOG.info("Triggering Bulk Delete Operations"); + summary = deleteFiles(bulkDeleteFunc, files); + } else { + LOG.warn("Warning falling back to non-bulk deletes"); + summary = deleteFiles(deleteExecutorService, deleteFunc, files); + } + LOG.info("Deleted {} total files", summary.totalFilesCount()); return new BaseExpireSnapshotsActionResult( diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index aff4b44f94fa..2739e4291bb8 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -33,6 +33,8 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A procedure that expires snapshots in a table. @@ -41,6 +43,8 @@ */ public class ExpireSnapshotsProcedure extends BaseProcedure { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), @@ -93,6 +97,12 @@ public InternalRow[] call(InternalRow args) { Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2); Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3); + if (maxConcurrentDeletes != null) { + LOG.warn( + "{} is now deprecated, parallelism should now be configured in the FileIO bulk operations. Check the" + + "configured FileIO for more information", + PARAMETERS[3].name()); + } Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4); Preconditions.checkArgument( diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index f49f37c02ea5..b63b08fd9c9e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -39,6 +39,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.runtime.BoxedUnit; /** @@ -47,6 +49,7 @@ * @see SparkActions#deleteOrphanFiles(Table) */ public class RemoveOrphanFilesProcedure extends BaseProcedure { + private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class); private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { @@ -98,6 +101,12 @@ public InternalRow[] call(InternalRow args) { String location = args.isNullAt(2) ? null : args.getString(2); boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); + if (maxConcurrentDeletes != null) { + LOG.warn( + "{} is now deprecated, parallelism should now be configured in the FileIO bulk operations. Check the" + + "configured FileIO for more information", + PARAMETERS[4].name()); + } String fileListView = args.isNullAt(5) ? null : args.getString(5); Preconditions.checkArgument( diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index 7004c6f8e079..e4a4951f9442 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -21,12 +21,8 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import java.io.File; -import java.io.IOException; import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; @@ -48,7 +44,6 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -195,57 +190,6 @@ public void testFilesCleaned() throws Exception { checkExpirationResults(1L, 0L, 0L, 1L, 2L, results); } - @Test - public void dataFilesCleanupWithParallelTasks() throws IOException { - - table.newFastAppend().appendFile(FILE_A).commit(); - - table.newFastAppend().appendFile(FILE_B).commit(); - - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); - - table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); - - long t4 = rightAfterSnapshot(); - - Set deletedFiles = Sets.newHashSet(); - Set deleteThreads = ConcurrentHashMap.newKeySet(); - AtomicInteger deleteThreadsIndex = new AtomicInteger(0); - - ExpireSnapshots.Result result = - SparkActions.get() - .expireSnapshots(table) - .executeDeleteWith( - Executors.newFixedThreadPool( - 4, - runnable -> { - Thread thread = new Thread(runnable); - thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement()); - thread.setDaemon( - true); // daemon threads will be terminated abruptly when the JVM exits - return thread; - })) - .expireOlderThan(t4) - .deleteWith( - s -> { - deleteThreads.add(Thread.currentThread().getName()); - deletedFiles.add(s); - }) - .execute(); - - // Verifies that the delete methods ran in the threads created by the provided ExecutorService - // ThreadFactory - Assert.assertEquals( - deleteThreads, - Sets.newHashSet( - "remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); - - Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); - Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); - - checkExpirationResults(2L, 0L, 0L, 3L, 3L, result); - } - @Test public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { table.newFastAppend().appendFile(FILE_A).commit(); @@ -553,7 +497,7 @@ public void testScanExpiredManifestInValidSnapshotAppend() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(t3) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); @@ -582,7 +526,7 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(t3) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); @@ -614,7 +558,7 @@ public void testWithExpiringDanglingStageCommit() { ExpireSnapshots.Result result = SparkActions.get() .expireSnapshots(table) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .expireOlderThan(snapshotB.timestampMillis() + 1) .execute(); @@ -686,7 +630,7 @@ public void testWithCherryPickTableSnapshot() { ExpireSnapshots.Result result = SparkActions.get() .expireSnapshots(table) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .expireOlderThan(snapshotC.timestampMillis() + 1) .execute(); @@ -735,7 +679,7 @@ public void testWithExpiringStagedThenCherrypick() { ExpireSnapshots.Result firstResult = SparkActions.get() .expireSnapshots(table) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .expireSnapshotId(snapshotB.snapshotId()) .execute(); @@ -755,7 +699,7 @@ public void testWithExpiringStagedThenCherrypick() { ExpireSnapshots.Result secondResult = SparkActions.get() .expireSnapshots(table) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) .execute(); @@ -792,7 +736,7 @@ public void testExpireOlderThan() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertEquals( @@ -841,7 +785,7 @@ public void testExpireOlderThanWithDelete() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertEquals( @@ -914,7 +858,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertEquals( @@ -976,7 +920,7 @@ public void testExpireOlderThanWithRollback() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertEquals( @@ -1031,7 +975,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertEquals( @@ -1090,7 +1034,7 @@ public void testExpireOlderThanWithDeleteFile() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(afterAllDeleted) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Set expectedDeletes = @@ -1132,7 +1076,7 @@ public void testExpireOnEmptyTable() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); checkExpirationResults(0, 0, 0, 0, 0, result); @@ -1312,7 +1256,7 @@ public void textExpireAllCheckFilesDeleted(int dataFilesExpired, int dataFilesRe SparkActions.get() .expireSnapshots(table) .expireOlderThan(end) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); Assert.assertEquals( @@ -1341,7 +1285,7 @@ public void testExpireSomeCheckFilesDeleted() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(after) - .deleteWith(deletedFiles::add) + .deleteBulkWith(files -> files.forEach(deletedFiles::add)) .execute(); // C, D should be retained (live) From 40f4b2a62ef0fcb95165da3260ed969567fde00b Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Mon, 30 Jan 2023 17:39:14 -0600 Subject: [PATCH 03/12] Add default, address review comments --- .../java/org/apache/iceberg/actions/DeleteOrphanFiles.java | 4 +++- .../java/org/apache/iceberg/actions/DeleteReachableFiles.java | 4 +++- .../main/java/org/apache/iceberg/actions/ExpireSnapshots.java | 4 +++- .../iceberg/spark/actions/ExpireSnapshotsSparkAction.java | 3 ++- .../iceberg/spark/procedures/RemoveOrphanFilesProcedure.java | 1 + 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index e78922d5444e..62203591ab95 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -90,7 +90,9 @@ public interface DeleteOrphanFiles extends Action> deleteFunc); + default DeleteOrphanFiles deleteBulkWith(Consumer> deleteFunc) { + throw new UnsupportedOperationException(); + } /** * Passes a prefix mismatch mode that determines how this action should handle situations when the diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java index 6ff2c4b008f8..a0f7aa9e4156 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java @@ -54,7 +54,9 @@ public interface DeleteReachableFiles * files as an argument. * @return this for method chaining */ - DeleteReachableFiles deleteBulkWith(Consumer> deleteFunc); + default DeleteReachableFiles deleteBulkWith(Consumer> deleteFunc) { + throw new UnsupportedOperationException(); + } /** * Passes an alternative executor service that will be used for files removal. diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 53b5d5af3e54..774f5ebbdd14 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -100,7 +100,9 @@ public interface ExpireSnapshots extends Action> deleteFunc); + default ExpireSnapshots deleteBulkWith(Consumer> deleteFunc){ + throw new UnsupportedOperationException(); + } /** The action result that contains a summary of the execution. */ interface Result { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index c5312543717c..a3a58de73263 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -290,7 +290,8 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { LOG.info("Triggering Bulk Delete Operations"); summary = deleteFiles(bulkDeleteFunc, files); } else { - LOG.warn("Warning falling back to non-bulk deletes"); + LOG.warn("Falling back to non-bulk deletes because the given io {} does not support bulk deletes. Using an IO" + + "with bulk deletes will provide better throughput.", ops.io()); summary = deleteFiles(deleteExecutorService, deleteFunc, files); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index b63b08fd9c9e..6ac7d5ecb829 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -107,6 +107,7 @@ public InternalRow[] call(InternalRow args) { + "configured FileIO for more information", PARAMETERS[4].name()); } + String fileListView = args.isNullAt(5) ? null : args.getString(5); Preconditions.checkArgument( From 548007369a99eb3322c935e09439700a27fdc4b7 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 31 Jan 2023 09:10:59 -0600 Subject: [PATCH 04/12] Spotless --- .../java/org/apache/iceberg/actions/ExpireSnapshots.java | 2 +- .../iceberg/spark/actions/ExpireSnapshotsSparkAction.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 774f5ebbdd14..66c307ea6ea9 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -100,7 +100,7 @@ public interface ExpireSnapshots extends Action> deleteFunc){ + default ExpireSnapshots deleteBulkWith(Consumer> deleteFunc) { throw new UnsupportedOperationException(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index a3a58de73263..de2557928405 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -290,8 +290,10 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { LOG.info("Triggering Bulk Delete Operations"); summary = deleteFiles(bulkDeleteFunc, files); } else { - LOG.warn("Falling back to non-bulk deletes because the given io {} does not support bulk deletes. Using an IO" + - "with bulk deletes will provide better throughput.", ops.io()); + LOG.warn( + "Falling back to non-bulk deletes because the given io {} does not support bulk deletes. Using an IO" + + "with bulk deletes will provide better throughput.", + ops.io()); summary = deleteFiles(deleteExecutorService, deleteFunc, files); } From 275ee771ccf3a483704529281106e48636fac7e1 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Fri, 3 Feb 2023 17:24:58 -0600 Subject: [PATCH 05/12] Remove Deprecations and new bulkDeleteWith --- .palantir/revapi.yml | 10 --- .../iceberg/actions/DeleteOrphanFiles.java | 26 ++---- .../iceberg/actions/DeleteReachableFiles.java | 24 ++---- .../iceberg/actions/ExpireSnapshots.java | 20 ++--- .../apache/iceberg/BaseIncrementalScan.java | 3 +- .../apache/iceberg/hadoop/HadoopFileIO.java | 1 + .../iceberg/hadoop/HadoopFileIOTest.java | 30 ++++++- .../spark/actions/BaseSparkAction.java | 6 +- .../actions/DeleteOrphanFilesSparkAction.java | 40 +++------ .../DeleteReachableFilesSparkAction.java | 35 +++----- .../actions/ExpireSnapshotsSparkAction.java | 39 +++------ .../TestDeleteReachableFilesAction.java | 56 ++++++++++++ .../actions/TestExpireSnapshotsAction.java | 86 +++++++++++++++---- 13 files changed, 214 insertions(+), 162 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 0bc16a710036..781cf369dff9 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -66,16 +66,6 @@ acceptedBreaks: old: "method void org.apache.iceberg.io.DataWriter::add(T)" justification: "Removing deprecated method" "1.1.0": - org.apache.iceberg:iceberg-api: - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.actions.DeleteOrphanFiles org.apache.iceberg.actions.DeleteOrphanFiles::deleteBulkWith(java.util.function.Consumer>)" - justification: "New Apis to allow for Bulk Deletion" - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.actions.DeleteReachableFiles org.apache.iceberg.actions.DeleteReachableFiles::deleteBulkWith(java.util.function.Consumer>)" - justification: "Adds new API for bulk deletes, no old apis are broken." - - code: "java.method.addedToInterface" - new: "method org.apache.iceberg.actions.ExpireSnapshots org.apache.iceberg.actions.ExpireSnapshots::deleteBulkWith(java.util.function.Consumer>)" - justification: "New Apis to allow for Bulk Deletion" org.apache.iceberg:iceberg-core: - code: "java.class.removed" old: "class org.apache.iceberg.rest.HTTPClientFactory" diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 62203591ab95..02e0c8e6593b 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -59,7 +59,7 @@ public interface DeleteOrphanFiles extends ActionThis method allows users to customize the delete func. For example, one may set a custom + *

This method allows users to customize the delete function. For example, one may set a custom * delete func and collect all orphan files into a set instead of physically removing them. * *

If not set, defaults to using the table's {@link org.apache.iceberg.io.FileIO io} @@ -67,33 +67,25 @@ public interface DeleteOrphanFiles extends Action deleteFunc); /** - * Passes an alternative executor service that will be used for removing orphaned files. + * Passes an alternative executor service that will be used for removing orphaned files. This + * service will only be used if a custom delete function is provided by {@link + * #deleteWith(Consumer)} or if the FileIO does not {@link + * org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}. Otherwise, parallelism + * should be controlled by the IO specific {@link + * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable) deleteFiles} method. * - *

If this method is not called, orphaned manifests and data files will still be deleted in the - * current thread. - * - *

+ *

If this method is not called and bulk deletes are not supported, orphaned manifests and data + * files will still be deleted in the current thread. * * @param executorService the service to use * @return this for method chaining - * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO - * specific parallelism controls to adjust bulk delete concurrency within that api. */ - @Deprecated DeleteOrphanFiles executeDeleteWith(ExecutorService executorService); - default DeleteOrphanFiles deleteBulkWith(Consumer> deleteFunc) { - throw new UnsupportedOperationException(); - } - /** * Passes a prefix mismatch mode that determines how this action should handle situations when the * metadata references files that match listed/provided files except for authority/scheme. diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java index a0f7aa9e4156..d310ca8db9e9 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java @@ -40,33 +40,19 @@ public interface DeleteReachableFiles * @param deleteFunc a function that will be called to delete files. The function accepts path to * file as an argument. * @return this for method chaining - * @deprecated Deletes are now performed in bulk see {@link #deleteBulkWith(Consumer)}. This - * method will only be used if the FileIO does not implement {@link - * org.apache.iceberg.io.SupportsBulkOperations} */ - @Deprecated DeleteReachableFiles deleteWith(Consumer deleteFunc); /** - * Passes an alternative delete implementation that will be used for files. - * - * @param deleteFunc a function that will be called to delete files. The function accepts paths to - * files as an argument. - * @return this for method chaining - */ - default DeleteReachableFiles deleteBulkWith(Consumer> deleteFunc) { - throw new UnsupportedOperationException(); - } - - /** - * Passes an alternative executor service that will be used for files removal. + * Passes an alternative executor service that will be used for files removal. This service will + * only be used if a custom delete function is provided by {@link #deleteWith(Consumer)} or if the + * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}. + * Otherwise, parallelism should be controlled by the IO specific {@link + * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable) deleteFiles} method. * * @param executorService the service to use * @return this for method chaining - * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO - * specific parallelism controls to adjust bulk delete concurrency within that api. */ - @Deprecated DeleteReachableFiles executeDeleteWith(ExecutorService executorService); /** diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 66c307ea6ea9..b0d3a9d212c6 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -76,34 +76,28 @@ public interface ExpireSnapshots extends Action deleteFunc); /** - * Passes an alternative executor service that will be used for manifests, data and delete files - * deletion. + * Passes an alternative executor service that will be used for files removal. This service will + * only be used if a custom delete function is provided by {@link #deleteWith(Consumer)} or if the + * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}. + * Otherwise, parallelism should be controlled by the IO specific {@link + * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable) deleteFiles} method. * - *

If this method is not called, unnecessary manifests and content files will still be deleted - * in the current thread. + *

If this method is not called and bulk deletes are not supported, unnecessary manifests and + * content files will still be deleted in the current thread. * *

Identical to {@link org.apache.iceberg.ExpireSnapshots#executeDeleteWith(ExecutorService)} * * @param executorService the service to use * @return this for method chaining - * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO - * specific parallelism controls to adjust bulk delete concurrency within that api. */ @Deprecated ExpireSnapshots executeDeleteWith(ExecutorService executorService); - default ExpireSnapshots deleteBulkWith(Consumer> deleteFunc) { - throw new UnsupportedOperationException(); - } - /** The action result that contains a summary of the execution. */ interface Result { /** Returns the number of deleted data files. */ diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java index 1f32bfe016fe..0d2eb45de95b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java @@ -131,7 +131,8 @@ private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) { } else { // validate there is an ancestor of toSnapshotIdInclusive where parent is fromSnapshotId Preconditions.checkArgument( - SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, fromSnapshotId), + SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, fromSnapshotId) + || fromSnapshotId.equals(toSnapshotIdInclusive), "Starting snapshot (exclusive) %s is not a parent ancestor of end snapshot %s", fromSnapshotId, toSnapshotIdInclusive); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 207207e78c1d..1cdae2e7e853 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -171,6 +171,7 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu .executeWith(executorService()) .retry(3) .stopRetryOn(FileNotFoundException.class) + .suppressFailureWhenFinished() .onFailure( (f, e) -> { LOG.error("Failure during bulk delete on file: {} ", f, e); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java index 2cf375592f7d..7920eae54be5 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java @@ -27,10 +27,12 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -120,6 +122,26 @@ public void testDeletePrefix() { () -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator()); } + @Test + public void testDeleteFiles() { + Path parent = new Path(tempDir.toURI()); + List filesCreated = createRandomFiles(parent, 10); + hadoopFileIO.deleteFiles( + filesCreated.stream().map(Path::toString).collect(Collectors.toList())); + filesCreated.forEach( + file -> Assert.assertFalse(hadoopFileIO.newInputFile(file.toString()).exists())); + } + + @Test + public void testDeleteFilesErrorHandling() { + List filesCreated = + random.ints(2).mapToObj(x -> "fakefsnotreal://file-" + x).collect(Collectors.toList()); + Assert.assertThrows( + "Should throw a BulkDeletionFailure Exceptions when files can't be deleted", + BulkDeletionFailureException.class, + () -> hadoopFileIO.deleteFiles(filesCreated)); + } + @Test public void testHadoopFileIOKryoSerialization() throws IOException { FileIO testHadoopFileIO = new HadoopFileIO(); @@ -142,17 +164,21 @@ public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoun Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties()); } - private void createRandomFiles(Path parent, int count) { + private List createRandomFiles(Path parent, int count) { + List paths = Lists.newArrayList(); random .ints(count) .parallel() .forEach( i -> { try { - fs.createNewFile(new Path(parent, "file-" + i)); + Path path = new Path(parent, "file-" + i); + paths.add(path); + fs.createNewFile(path); } catch (IOException e) { throw new UncheckedIOException(e); } }); + return paths; } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 0c151800bdce..ce645ca12835 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -51,6 +51,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -257,8 +258,7 @@ protected DeleteSummary deleteFiles( return summary; } - protected DeleteSummary deleteFiles( - Consumer> bulkDeleteFunc, Iterator files) { + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator files) { DeleteSummary summary = new DeleteSummary(); Iterator> groupedIterator = Iterators.partition(files, DELETE_GROUP_SIZE); @@ -277,7 +277,7 @@ protected DeleteSummary deleteFiles( .collect(Collectors.toList()); int failures = 0; try { - bulkDeleteFunc.accept(pathsToDelete); + io.deleteFiles(pathsToDelete); } catch (BulkDeletionFailureException bulkDeletionFailureException) { failures = bulkDeletionFailureException.numberFailedObjects(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 8ddb79402afb..6390b0be3158 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -48,7 +48,6 @@ import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; -import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -121,27 +120,13 @@ public void accept(String file) { } }; - private final Consumer> defaultBulkDelete = - new Consumer>() { - @Override - public void accept(Iterable paths) { - Preconditions.checkArgument( - table.io() instanceof SupportsBulkOperations, - "FileIO {} does not support bulk deletes", - table.io()); - ((SupportsBulkOperations) table.io()).deleteFiles(paths); - } - }; - - private Consumer> bulkDeleteFunc = defaultBulkDelete; - private Map equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT); private Map equalAuthorities = Collections.emptyMap(); private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR; private String location = null; private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); private Dataset compareToFileList; - private Consumer deleteFunc = defaultDelete; + private Consumer deleteFunc = null; private ExecutorService deleteExecutorService = null; DeleteOrphanFilesSparkAction(SparkSession spark, Table table) { @@ -168,12 +153,6 @@ public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorSe return this; } - @Override - public DeleteOrphanFiles deleteBulkWith(Consumer> newBulkDeleteFunc) { - this.bulkDeleteFunc = newBulkDeleteFunc; - return this; - } - @Override public DeleteOrphanFilesSparkAction prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) { this.prefixMismatchMode = newPrefixMismatchMode; @@ -268,21 +247,22 @@ private DeleteOrphanFiles.Result doExecute() { List orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - if (table.io() instanceof SupportsBulkOperations) { - try { - bulkDeleteFunc.accept(orphanFiles); - } catch (BulkDeletionFailureException bulkDeletionFailureException) { - LOG.warn( - "Bulk delete failed to remove {} files", - bulkDeletionFailureException.numberFailedObjects()); + if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) { + if (deleteFunc == null) { + LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); + deleteFunc = defaultDelete; + } else { + LOG.info("Custom delete function provided."); } - } else { + Tasks.foreach(orphanFiles) .noRetry() .executeWith(deleteExecutorService) .suppressFailureWhenFinished() .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) .run(deleteFunc::accept); + } else { + ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); } return new BaseDeleteOrphanFilesActionResult(orphanFiles); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index c5c6c29ecd04..e1b1b54ced92 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -65,19 +65,7 @@ public void accept(String file) { } }; - private Consumer deleteFunc = defaultDelete; - - private final Consumer> defaultBulkDelete = - new Consumer>() { - @Override - public void accept(Iterable paths) { - Preconditions.checkArgument( - io instanceof SupportsBulkOperations, "FileIO {} does not support bulk deletes", io); - ((SupportsBulkOperations) io).deleteFiles(paths); - } - }; - - private Consumer> bulkDeleteFunc = defaultBulkDelete; + private Consumer deleteFunc = null; private ExecutorService deleteExecutorService = null; private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf()); @@ -109,12 +97,6 @@ public DeleteReachableFilesSparkAction executeDeleteWith(ExecutorService executo return this; } - @Override - public DeleteReachableFiles deleteBulkWith(Consumer> newBulkDeleteFunc) { - this.bulkDeleteFunc = newBulkDeleteFunc; - return this; - } - @Override public Result execute() { Preconditions.checkArgument(io != null, "File IO cannot be null"); @@ -154,12 +136,17 @@ private Dataset reachableFileDS(TableMetadata metadata) { private DeleteReachableFiles.Result deleteFiles(Iterator files) { DeleteSummary summary; - if (io instanceof SupportsBulkOperations) { - LOG.info("Triggering Bulk Delete Operations"); - summary = deleteFiles(bulkDeleteFunc, files); - } else { - LOG.warn("Warning falling back to non-bulk deletes"); + if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) { + if (deleteFunc == null) { + LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); + deleteFunc = defaultDelete; + } else { + LOG.info("Custom delete function provided."); + } + summary = deleteFiles(deleteExecutorService, deleteFunc, files); + } else { + summary = deleteFiles((SupportsBulkOperations) io, files); } LOG.info("Deleted {} total files", summary.totalFilesCount()); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index de2557928405..406801636234 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -86,20 +86,7 @@ public void accept(String file) { private final Set expiredSnapshotIds = Sets.newHashSet(); private Long expireOlderThanValue = null; private Integer retainLastValue = null; - private Consumer deleteFunc = defaultDelete; - private final Consumer> defaultBulkDelete = - new Consumer>() { - @Override - public void accept(Iterable paths) { - Preconditions.checkArgument( - ops.io() instanceof SupportsBulkOperations, - "FileIO {} does not support bulk deletes", - ops.io()); - ((SupportsBulkOperations) ops.io()).deleteFiles(paths); - } - }; - - private Consumer> bulkDeleteFunc = defaultBulkDelete; + private Consumer deleteFunc = null; private ExecutorService deleteExecutorService = null; private Dataset expiredFileDS = null; @@ -124,12 +111,6 @@ public ExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorServ return this; } - @Override - public ExpireSnapshots deleteBulkWith(Consumer> newBulkDeleteFunc) { - this.bulkDeleteFunc = newBulkDeleteFunc; - return this; - } - @Override public ExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) { expiredSnapshotIds.add(snapshotId); @@ -286,15 +267,17 @@ private Set findExpiredSnapshotIds( private ExpireSnapshots.Result deleteFiles(Iterator files) { DeleteSummary summary; - if (ops.io() instanceof SupportsBulkOperations) { - LOG.info("Triggering Bulk Delete Operations"); - summary = deleteFiles(bulkDeleteFunc, files); - } else { - LOG.warn( - "Falling back to non-bulk deletes because the given io {} does not support bulk deletes. Using an IO" - + "with bulk deletes will provide better throughput.", - ops.io()); + if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) { + if (deleteFunc == null) { + LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); + deleteFunc = defaultDelete; + } else { + LOG.info("Custom delete function provided."); + } + summary = deleteFiles(deleteExecutorService, deleteFunc, files); + } else { + summary = deleteFiles((SupportsBulkOperations) table.io(), files); } LOG.info("Deleted {} total files", summary.totalFilesCount()); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java index 4f95a73bd729..154e940519d1 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java @@ -21,6 +21,10 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import java.io.File; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; @@ -40,8 +44,11 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -152,6 +159,55 @@ private void checkRemoveFilesResults( results.deletedOtherFilesCount()); } + @Test + public void dataFilesCleanupWithParallelTasks() { + table.newFastAppend().appendFile(FILE_A).commit(); + + table.newFastAppend().appendFile(FILE_B).commit(); + + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); + + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); + + Set deletedFiles = ConcurrentHashMap.newKeySet(); + Set deleteThreads = ConcurrentHashMap.newKeySet(); + AtomicInteger deleteThreadsIndex = new AtomicInteger(0); + + DeleteReachableFiles.Result result = + sparkActions() + .deleteReachableFiles(metadataLocation(table)) + .io(table.io()) + .executeDeleteWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("remove-files-" + deleteThreadsIndex.getAndIncrement()); + thread.setDaemon( + true); // daemon threads will be terminated abruptly when the JVM exits + return thread; + })) + .deleteWith( + s -> { + deleteThreads.add(Thread.currentThread().getName()); + deletedFiles.add(s); + }) + .execute(); + + // Verifies that the delete methods ran in the threads created by the provided ExecutorService + // ThreadFactory + Assert.assertEquals( + deleteThreads, + Sets.newHashSet("remove-files-0", "remove-files-1", "remove-files-2", "remove-files-3")); + + Lists.newArrayList(FILE_A, FILE_B, FILE_C, FILE_D) + .forEach( + file -> + Assert.assertTrue( + "FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()))); + checkRemoveFilesResults(4L, 0, 0, 6L, 4L, 6, result); + } + @Test public void testWithExpiringDanglingStageCommit() { table.location(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index e4a4951f9442..7004c6f8e079 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -21,8 +21,12 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; @@ -44,6 +48,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -190,6 +195,57 @@ public void testFilesCleaned() throws Exception { checkExpirationResults(1L, 0L, 0L, 1L, 2L, results); } + @Test + public void dataFilesCleanupWithParallelTasks() throws IOException { + + table.newFastAppend().appendFile(FILE_A).commit(); + + table.newFastAppend().appendFile(FILE_B).commit(); + + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)).commit(); + + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)).commit(); + + long t4 = rightAfterSnapshot(); + + Set deletedFiles = Sets.newHashSet(); + Set deleteThreads = ConcurrentHashMap.newKeySet(); + AtomicInteger deleteThreadsIndex = new AtomicInteger(0); + + ExpireSnapshots.Result result = + SparkActions.get() + .expireSnapshots(table) + .executeDeleteWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement()); + thread.setDaemon( + true); // daemon threads will be terminated abruptly when the JVM exits + return thread; + })) + .expireOlderThan(t4) + .deleteWith( + s -> { + deleteThreads.add(Thread.currentThread().getName()); + deletedFiles.add(s); + }) + .execute(); + + // Verifies that the delete methods ran in the threads created by the provided ExecutorService + // ThreadFactory + Assert.assertEquals( + deleteThreads, + Sets.newHashSet( + "remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); + + checkExpirationResults(2L, 0L, 0L, 3L, 3L, result); + } + @Test public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { table.newFastAppend().appendFile(FILE_A).commit(); @@ -497,7 +553,7 @@ public void testScanExpiredManifestInValidSnapshotAppend() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(t3) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); @@ -526,7 +582,7 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(t3) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); @@ -558,7 +614,7 @@ public void testWithExpiringDanglingStageCommit() { ExpireSnapshots.Result result = SparkActions.get() .expireSnapshots(table) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .expireOlderThan(snapshotB.timestampMillis() + 1) .execute(); @@ -630,7 +686,7 @@ public void testWithCherryPickTableSnapshot() { ExpireSnapshots.Result result = SparkActions.get() .expireSnapshots(table) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .expireOlderThan(snapshotC.timestampMillis() + 1) .execute(); @@ -679,7 +735,7 @@ public void testWithExpiringStagedThenCherrypick() { ExpireSnapshots.Result firstResult = SparkActions.get() .expireSnapshots(table) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .expireSnapshotId(snapshotB.snapshotId()) .execute(); @@ -699,7 +755,7 @@ public void testWithExpiringStagedThenCherrypick() { ExpireSnapshots.Result secondResult = SparkActions.get() .expireSnapshots(table) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) .execute(); @@ -736,7 +792,7 @@ public void testExpireOlderThan() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertEquals( @@ -785,7 +841,7 @@ public void testExpireOlderThanWithDelete() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertEquals( @@ -858,7 +914,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertEquals( @@ -920,7 +976,7 @@ public void testExpireOlderThanWithRollback() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertEquals( @@ -975,7 +1031,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(tAfterCommits) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertEquals( @@ -1034,7 +1090,7 @@ public void testExpireOlderThanWithDeleteFile() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(afterAllDeleted) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Set expectedDeletes = @@ -1076,7 +1132,7 @@ public void testExpireOnEmptyTable() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(System.currentTimeMillis()) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); checkExpirationResults(0, 0, 0, 0, 0, result); @@ -1256,7 +1312,7 @@ public void textExpireAllCheckFilesDeleted(int dataFilesExpired, int dataFilesRe SparkActions.get() .expireSnapshots(table) .expireOlderThan(end) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); Assert.assertEquals( @@ -1285,7 +1341,7 @@ public void testExpireSomeCheckFilesDeleted() { SparkActions.get() .expireSnapshots(table) .expireOlderThan(after) - .deleteBulkWith(files -> files.forEach(deletedFiles::add)) + .deleteWith(deletedFiles::add) .execute(); // C, D should be retained (live) From 01db68d9412f7a780c9100e482abe1c20ed2a148 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Fri, 3 Feb 2023 17:27:24 -0600 Subject: [PATCH 06/12] Accidental changewq --- core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java index 0d2eb45de95b..1f32bfe016fe 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java @@ -131,8 +131,7 @@ private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) { } else { // validate there is an ancestor of toSnapshotIdInclusive where parent is fromSnapshotId Preconditions.checkArgument( - SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, fromSnapshotId) - || fromSnapshotId.equals(toSnapshotIdInclusive), + SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive, fromSnapshotId), "Starting snapshot (exclusive) %s is not a parent ancestor of end snapshot %s", fromSnapshotId, toSnapshotIdInclusive); From c253c519545a8a5d404a541fee71f57e476d817c Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 8 Feb 2023 12:48:00 -0600 Subject: [PATCH 07/12] WIP --- .../iceberg/actions/ExpireSnapshots.java | 2 - .../procedures/ExpireSnapshotsProcedure.java | 4 +- .../RemoveOrphanFilesProcedure.java | 4 +- .../actions/TestExpireSnapshotsAction.java | 45 +++++++++++++------ 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index b0d3a9d212c6..8274f11042bc 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -77,7 +77,6 @@ public interface ExpireSnapshots extends Action deleteFunc); /** @@ -95,7 +94,6 @@ public interface ExpireSnapshots extends Action... files){ + temp. + } + @Before public void setupTableLocation() throws Exception { this.tableDir = temp.newFolder(); @@ -1352,4 +1364,9 @@ public void testExpireSomeCheckFilesDeleted() { Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString())); Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString())); } + + @Test + public void testNonCustomDelete() { + + } } From c6b3cb0f4e6f2d4d5bd24f8b2b884da97e48a227 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 28 Feb 2023 12:02:01 -0600 Subject: [PATCH 08/12] Reviewer Comments --- .../iceberg/actions/DeleteOrphanFiles.java | 5 +- .../iceberg/actions/DeleteReachableFiles.java | 5 +- .../apache/iceberg/hadoop/HadoopFileIO.java | 5 +- .../spark/actions/BaseSparkAction.java | 51 +++++++++++-------- .../actions/DeleteOrphanFilesSparkAction.java | 23 +++++---- 5 files changed, 50 insertions(+), 39 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 02e0c8e6593b..279a44fa5580 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -74,9 +75,9 @@ public interface DeleteOrphanFiles extends ActionIf this method is not called and bulk deletes are not supported, orphaned manifests and data * files will still be deleted in the current thread. diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java index d310ca8db9e9..4704fafe7835 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; /** * An action that deletes all files referenced by a table metadata file. @@ -46,9 +47,9 @@ public interface DeleteReachableFiles /** * Passes an alternative executor service that will be used for files removal. This service will * only be used if a custom delete function is provided by {@link #deleteWith(Consumer)} or if the - * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}. + * FileIO does not {@link SupportsBulkOperations support bulk deletes}. * Otherwise, parallelism should be controlled by the IO specific {@link - * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable) deleteFiles} method. + * SupportsBulkOperations#deleteFiles(Iterable) deleteFiles} method. * * @param executorService the service to use * @return this for method chaining diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 1cdae2e7e853..47c2ad6b5236 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -51,7 +51,8 @@ public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); - private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete_file_parallelism"; + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; private static volatile ExecutorService executorService; @@ -196,7 +197,7 @@ private ExecutorService executorService() { synchronized (HadoopFileIO.class) { if (executorService == null) { executorService = - ThreadPools.newWorkerPool("iceberg-hadoopfileio-delete", deleteThreads()); + ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads()); } } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index ce645ca12835..a32a0ca5d4c9 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -22,6 +22,7 @@ import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.lit; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,9 +56,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.JobGroupUtils; import org.apache.iceberg.spark.SparkTableUtil; @@ -260,36 +264,33 @@ protected DeleteSummary deleteFiles( protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator files) { DeleteSummary summary = new DeleteSummary(); - Iterator> groupedIterator = Iterators.partition(files, DELETE_GROUP_SIZE); + Iterator> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); - Tasks.foreach(groupedIterator) + Tasks.foreach(fileGroups) .suppressFailureWhenFinished() - .run( - fileList -> { - Map> filesByType = - fileList.stream().collect(Collectors.groupingBy(FileInfo::getType)); - filesByType.entrySet().stream() - .forEach( - entry -> { - List pathsToDelete = - entry.getValue().stream() - .map(FileInfo::getPath) - .collect(Collectors.toList()); - int failures = 0; - try { - io.deleteFiles(pathsToDelete); - } catch (BulkDeletionFailureException bulkDeletionFailureException) { - failures = bulkDeletionFailureException.numberFailedObjects(); - } - summary.deletedFiles(entry.getKey(), pathsToDelete.size() - failures); - }); - }); + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); return summary; } + private static void deleteFileGroup(List fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap filesByType = Multimaps.index(fileGroup, FileInfo::getType); + ListMultimap pathsByType = Multimaps.transformValues(filesByType, FileInfo::getPath); + for (Map.Entry> entry : pathsByType.asMap().entrySet()) { + String type = entry.getKey(); + Collection paths = entry.getValue(); + int failures = 0; + try { + io.deleteFiles(paths); + } catch (BulkDeletionFailureException bulkDeletionFailureException) { + failures = bulkDeletionFailureException.numberFailedObjects(); + } + summary.deletedFiles(entry.getKey(), paths.size() - failures); + } + } + static class DeleteSummary { private final AtomicLong dataFilesCount = new AtomicLong(0L); private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L); @@ -301,16 +302,22 @@ static class DeleteSummary { public void deletedFiles(String type, int numFiles) { if (FileContent.DATA.name().equalsIgnoreCase(type)) { dataFilesCount.addAndGet(numFiles); + } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) { positionDeleteFilesCount.addAndGet(numFiles); + } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) { equalityDeleteFilesCount.addAndGet(numFiles); + } else if (MANIFEST.equalsIgnoreCase(type)) { manifestsCount.addAndGet(numFiles); + } else if (MANIFEST_LIST.equalsIgnoreCase(type)) { manifestListsCount.addAndGet(numFiles); + } else if (OTHERS.equalsIgnoreCase(type)) { otherFilesCount.addAndGet(numFiles); + } else { throw new ValidationException("Illegal file type: %s", type); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 6390b0be3158..64c1bea76256 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -247,22 +247,23 @@ private DeleteOrphanFiles.Result doExecute() { List orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) { - if (deleteFunc == null) { - LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); + + } else { + if (deleteFunc != null) { + LOG.info("Bulk Deletes are not Supported by {}, using non-bulk deletes", table.io()); deleteFunc = defaultDelete; } else { - LOG.info("Custom delete function provided."); + LOG.info("Custom delete function provided, using non-bulk deletes"); } Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); - } else { - ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) + .run(deleteFunc::accept); } return new BaseDeleteOrphanFilesActionResult(orphanFiles); From cb28cbaaeda9dfe6dfff3c9133f6fba981ede092 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 28 Feb 2023 12:35:20 -0600 Subject: [PATCH 09/12] Spotless --- .../iceberg/actions/DeleteOrphanFiles.java | 5 +-- .../iceberg/actions/DeleteReachableFiles.java | 6 +-- .../apache/iceberg/hadoop/HadoopFileIO.java | 3 +- .../spark/actions/BaseSparkAction.java | 10 +++-- .../actions/DeleteOrphanFilesSparkAction.java | 10 ++--- .../actions/TestExpireSnapshotsAction.java | 45 ++++++------------- 6 files changed, 31 insertions(+), 48 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 279a44fa5580..4e8f80fa833f 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -74,9 +74,8 @@ public interface DeleteOrphanFiles extends ActionIf this method is not called and bulk deletes are not supported, orphaned manifests and data diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java index 4704fafe7835..d4ebb5a2007c 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java @@ -47,9 +47,9 @@ public interface DeleteReachableFiles /** * Passes an alternative executor service that will be used for files removal. This service will * only be used if a custom delete function is provided by {@link #deleteWith(Consumer)} or if the - * FileIO does not {@link SupportsBulkOperations support bulk deletes}. - * Otherwise, parallelism should be controlled by the IO specific {@link - * SupportsBulkOperations#deleteFiles(Iterable) deleteFiles} method. + * FileIO does not {@link SupportsBulkOperations support bulk deletes}. Otherwise, parallelism + * should be controlled by the IO specific {@link SupportsBulkOperations#deleteFiles(Iterable) + * deleteFiles} method. * * @param executorService the service to use * @return this for method chaining diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 47c2ad6b5236..314d64902099 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -196,8 +196,7 @@ private ExecutorService executorService() { if (executorService == null) { synchronized (HadoopFileIO.class) { if (executorService == null) { - executorService = - ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads()); + executorService = ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads()); } } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index a32a0ca5d4c9..5236e30fb120 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.iceberg.AllManifestsTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; @@ -275,9 +274,12 @@ protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator fileGroup, SupportsBulkOperations io, DeleteSummary summary) { - ImmutableListMultimap filesByType = Multimaps.index(fileGroup, FileInfo::getType); - ListMultimap pathsByType = Multimaps.transformValues(filesByType, FileInfo::getPath); + private static void deleteFileGroup( + List fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap filesByType = + Multimaps.index(fileGroup, FileInfo::getType); + ListMultimap pathsByType = + Multimaps.transformValues(filesByType, FileInfo::getPath); for (Map.Entry> entry : pathsByType.asMap().entrySet()) { String type = entry.getKey(); Collection paths = entry.getValue(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 64c1bea76256..300283f9596e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -259,11 +259,11 @@ private DeleteOrphanFiles.Result doExecute() { } Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) + .run(deleteFunc::accept); } return new BaseDeleteOrphanFilesActionResult(orphanFiles); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index d79d03f8fbbc..7004c6f8e079 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -75,69 +75,57 @@ public class TestExpireSnapshotsAction extends SparkTestBase { private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); - @Rule public TemporaryFolder temp = new TemporaryFolder(); - - DataFile FILE_A; - DataFile FILE_B; - DataFile FILE_C; - DataFile FILE_D; - DeleteFile FILE_A_POS_DELETES; - DeleteFile FILE_A_EQ_DELETES; - - private DataFile - + static final DataFile FILE_A = DataFiles.builder(SPEC) - .withPath(temp.toString() + "/path/to/data-a.parquet") + .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=0") // easy way to set partition data for now .withRecordCount(1) .build(); - final DataFile FILE_B = + static final DataFile FILE_B = DataFiles.builder(SPEC) - .withPath(temp.toString() + "/path/to/data-b.parquet") + .withPath("/path/to/data-b.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=1") // easy way to set partition data for now .withRecordCount(1) .build(); - final DataFile FILE_C = + static final DataFile FILE_C = DataFiles.builder(SPEC) - .withPath(temp.toString() + "/path/to/data-c.parquet") + .withPath("/path/to/data-c.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=2") // easy way to set partition data for now .withRecordCount(1) .build(); - final DataFile FILE_D = + static final DataFile FILE_D = DataFiles.builder(SPEC) - .withPath(temp.toString() + "/path/to/data-d.parquet") + .withPath("/path/to/data-d.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=3") // easy way to set partition data for now .withRecordCount(1) .build(); - final DeleteFile FILE_A_POS_DELETES = + static final DeleteFile FILE_A_POS_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofPositionDeletes() - .withPath(temp.toString() + "/path/to/data-a-pos-deletes.parquet") + .withPath("/path/to/data-a-pos-deletes.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=0") // easy way to set partition data for now .withRecordCount(1) .build(); - final DeleteFile FILE_A_EQ_DELETES = + static final DeleteFile FILE_A_EQ_DELETES = FileMetadata.deleteFileBuilder(SPEC) .ofEqualityDeletes() - .withPath(temp.toString() + "/path/to/data-a-eq-deletes.parquet") + .withPath("/path/to/data-a-eq-deletes.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=0") // easy way to set partition data for now .withRecordCount(1) .build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); + private File tableDir; private String tableLocation; private Table table; - private createBackingFiles(List... files){ - temp. - } - @Before public void setupTableLocation() throws Exception { this.tableDir = temp.newFolder(); @@ -1364,9 +1352,4 @@ public void testExpireSomeCheckFilesDeleted() { Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString())); Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString())); } - - @Test - public void testNonCustomDelete() { - - } } From 0f032ca149529f9e2fa7a00a34bb01100c2af4c7 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Tue, 28 Feb 2023 15:31:26 -0600 Subject: [PATCH 10/12] Fix Delete Bug --- .../iceberg/spark/actions/DeleteOrphanFilesSparkAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 300283f9596e..1c3b7dc3504b 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -251,7 +251,7 @@ private DeleteOrphanFiles.Result doExecute() { ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); } else { - if (deleteFunc != null) { + if (deleteFunc == null) { LOG.info("Bulk Deletes are not Supported by {}, using non-bulk deletes", table.io()); deleteFunc = defaultDelete; } else { From 66913e6f3a1ddff0fb2c4c419c5483421f772710 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Wed, 1 Mar 2023 12:03:18 -0600 Subject: [PATCH 11/12] Review Comments --- .../iceberg/actions/ExpireSnapshots.java | 7 +-- .../apache/iceberg/hadoop/HadoopFileIO.java | 9 ++-- .../spark/actions/BaseSparkAction.java | 14 +++--- .../actions/DeleteOrphanFilesSparkAction.java | 45 ++++++++++--------- .../DeleteReachableFilesSparkAction.java | 25 ++++------- .../actions/ExpireSnapshotsSparkAction.java | 17 +++---- .../procedures/ExpireSnapshotsProcedure.java | 19 +++++--- .../RemoveOrphanFilesProcedure.java | 20 +++++---- 8 files changed, 80 insertions(+), 76 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 8274f11042bc..0e02f4bec964 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.io.SupportsBulkOperations; /** * An action that expires snapshots in a table. @@ -82,9 +83,9 @@ public interface ExpireSnapshots extends ActionIf this method is not called and bulk deletes are not supported, unnecessary manifests and * content files will still be deleted in the current thread. diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 314d64902099..04ead0bd6791 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -53,6 +53,7 @@ public class HadoopFileIO private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; private static volatile ExecutorService executorService; @@ -170,7 +171,7 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu AtomicInteger failureCount = new AtomicInteger(0); Tasks.foreach(pathsToDelete) .executeWith(executorService()) - .retry(3) + .retry(DELETE_RETRY_ATTEMPTS) .stopRetryOn(FileNotFoundException.class) .suppressFailureWhenFinished() .onFailure( @@ -186,10 +187,8 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu } private int deleteThreads() { - return conf() - .getInt( - DELETE_FILE_PARALLELISM, - Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE); + int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE; + return conf().getInt(DELETE_FILE_PARALLELISM, defaultValue); } private ExecutorService executorService() { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 5236e30fb120..1b285e8caca8 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -55,7 +55,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -269,27 +268,26 @@ protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator deleteFileGroup(fileGroup, io, summary)); - LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); - return summary; } private static void deleteFileGroup( List fileGroup, SupportsBulkOperations io, DeleteSummary summary) { - ImmutableListMultimap filesByType = - Multimaps.index(fileGroup, FileInfo::getType); + + ListMultimap filesByType = Multimaps.index(fileGroup, FileInfo::getType); ListMultimap pathsByType = Multimaps.transformValues(filesByType, FileInfo::getPath); + for (Map.Entry> entry : pathsByType.asMap().entrySet()) { String type = entry.getKey(); Collection paths = entry.getValue(); int failures = 0; try { io.deleteFiles(paths); - } catch (BulkDeletionFailureException bulkDeletionFailureException) { - failures = bulkDeletionFailureException.numberFailedObjects(); + } catch (BulkDeletionFailureException e) { + failures = e.numberFailedObjects(); } - summary.deletedFiles(entry.getKey(), paths.size() - failures); + summary.deletedFiles(type, paths.size() - failures); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 1c3b7dc3504b..c5b2f4cea343 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -48,6 +48,7 @@ import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -112,14 +113,6 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction defaultDelete = - new Consumer() { - @Override - public void accept(String file) { - table.io().deleteFile(file); - } - }; - private Map equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT); private Map equalAuthorities = Collections.emptyMap(); private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR; @@ -240,6 +233,16 @@ private String jobDesc() { return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name()); } + private void deleteFiles(SupportsBulkOperations io, List paths) { + try { + io.deleteFiles(paths); + LOG.info("Deleted {} files using bulk deletes", paths.size()); + } catch (BulkDeletionFailureException e) { + int deletedFilesCount = paths.size() - e.numberFailedObjects(); + LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size()); + } + } + private DeleteOrphanFiles.Result doExecute() { Dataset actualFileIdentDS = actualFileIdentDS(); Dataset validFileIdentDS = validFileIdentDS(); @@ -248,22 +251,24 @@ private DeleteOrphanFiles.Result doExecute() { findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { - ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); - + deleteFiles((SupportsBulkOperations) table.io(), orphanFiles); } else { + + Tasks.Builder deleteTasks = + Tasks.foreach(orphanFiles) + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + if (deleteFunc == null) { - LOG.info("Bulk Deletes are not Supported by {}, using non-bulk deletes", table.io()); - deleteFunc = defaultDelete; + LOG.info( + "Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io()); + deleteTasks.run(table.io()::deleteFile); } else { - LOG.info("Custom delete function provided, using non-bulk deletes"); + LOG.info("Custom delete function provided. Using non-bulk deletes"); + deleteTasks.run(deleteFunc::accept); } - - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); } return new BaseDeleteOrphanFilesActionResult(orphanFiles); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index e1b1b54ced92..6b716287bf73 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -56,15 +56,6 @@ public class DeleteReachableFilesSparkAction private final String metadataFileLocation; - @Deprecated - private final Consumer defaultDelete = - new Consumer() { - @Override - public void accept(String file) { - io.deleteFile(file); - } - }; - private Consumer deleteFunc = null; private ExecutorService deleteExecutorService = null; private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf()); @@ -136,17 +127,17 @@ private Dataset reachableFileDS(TableMetadata metadata) { private DeleteReachableFiles.Result deleteFiles(Iterator files) { DeleteSummary summary; - if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) { + if (deleteFunc == null && io instanceof SupportsBulkOperations) { + summary = deleteFiles((SupportsBulkOperations) io, files); + } else { + if (deleteFunc == null) { - LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); - deleteFunc = defaultDelete; + LOG.info("Table IO {} does not support bulk operations. Using non-bulk deletes.", io); + summary = deleteFiles(deleteExecutorService, io::deleteFile, files); } else { - LOG.info("Custom delete function provided."); + LOG.info("Custom delete function provided. Using non-bulk deletes"); + summary = deleteFiles(deleteExecutorService, deleteFunc, files); } - - summary = deleteFiles(deleteExecutorService, deleteFunc, files); - } else { - summary = deleteFiles((SupportsBulkOperations) io, files); } LOG.info("Deleted {} total files", summary.totalFilesCount()); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index 406801636234..714f5bfb7b7d 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -267,17 +267,18 @@ private Set findExpiredSnapshotIds( private ExpireSnapshots.Result deleteFiles(Iterator files) { DeleteSummary summary; - if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) { + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + summary = deleteFiles((SupportsBulkOperations) table.io(), files); + } else { + if (deleteFunc == null) { - LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); - deleteFunc = defaultDelete; + LOG.info( + "Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io()); + summary = deleteFiles(deleteExecutorService, table.io()::deleteFile, files); } else { - LOG.info("Custom delete function provided."); + LOG.info("Custom delete function provided. Using non-bulk deletes"); + summary = deleteFiles(deleteExecutorService, deleteFunc, files); } - - summary = deleteFiles(deleteExecutorService, deleteFunc, files); - } else { - summary = deleteFiles((SupportsBulkOperations) table.io(), files); } LOG.info("Deleted {} total files", summary.totalFilesCount()); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index b1cb86657875..0c2f1b55211c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -20,6 +20,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.ExpireSnapshots; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction; import org.apache.iceberg.spark.actions.SparkActions; @@ -97,12 +98,6 @@ public InternalRow[] call(InternalRow args) { Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2); Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3); - if (maxConcurrentDeletes != null) { - LOG.warn( - "Setting {} disables FileIO bulk deletes if they are supported. Parallelism for bulk deletes should " - + "be configured in the FileIO bulk operations. Check the configured FileIO for more information", - PARAMETERS[3].name()); - } Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4); Preconditions.checkArgument( @@ -124,7 +119,17 @@ public InternalRow[] call(InternalRow args) { } if (maxConcurrentDeletes != null) { - action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots")); + if (table.io() instanceof SupportsBulkOperations) { + LOG.warn( + "max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This" + + "table is currently using {} which supports bulk deletes so the parameter will be ignored. " + + "See that IO's documentation to learn how to adjust parallelism for that particular " + + "IO's bulk delete.", + table.io()); + } else { + + action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots")); + } } if (streamResult != null) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index aee6c7bb4d42..b5b27284f655 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -23,6 +23,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -101,13 +102,6 @@ public InternalRow[] call(InternalRow args) { String location = args.isNullAt(2) ? null : args.getString(2); boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); - if (maxConcurrentDeletes != null) { - LOG.warn( - "Setting {} disables FileIO bulk deletes if they are supported. Parallelism for bulk deletes should " - + "be configured in the FileIO bulk operations. Check the configured FileIO for more information", - PARAMETERS[4].name()); - } - String fileListView = args.isNullAt(5) ? null : args.getString(5); Preconditions.checkArgument( @@ -164,7 +158,17 @@ public InternalRow[] call(InternalRow args) { } if (maxConcurrentDeletes != null) { - action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); + if (table.io() instanceof SupportsBulkOperations) { + LOG.warn( + "max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This" + + "table is currently using {} which supports bulk deletes so the parameter will be ignored. " + + "See that IO's documentation to learn how to adjust parallelism for that particular " + + "IO's bulk delete.", + table.io()); + } else { + + action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); + } } if (fileListView != null) { From e81edc0e5f3c2e5d34f6668b866201a327a07192 Mon Sep 17 00:00:00 2001 From: Russell_Spitzer Date: Thu, 2 Mar 2023 10:10:52 -0600 Subject: [PATCH 12/12] Last CommentsWq --- .../spark/actions/DeleteOrphanFilesSparkAction.java | 3 ++- .../spark/actions/DeleteReachableFilesSparkAction.java | 4 +++- .../spark/actions/ExpireSnapshotsSparkAction.java | 10 ++-------- .../spark/procedures/ExpireSnapshotsProcedure.java | 2 +- .../spark/procedures/RemoveOrphanFilesProcedure.java | 2 +- 5 files changed, 9 insertions(+), 12 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index c5b2f4cea343..ea73403c2e60 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -263,7 +263,8 @@ private DeleteOrphanFiles.Result doExecute() { if (deleteFunc == null) { LOG.info( - "Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io()); + "Table IO {} does not support bulk operations. Using non-bulk deletes.", + table.io().getClass().getName()); deleteTasks.run(table.io()::deleteFile); } else { LOG.info("Custom delete function provided. Using non-bulk deletes"); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java index 6b716287bf73..cdc60a659d7c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java @@ -132,7 +132,9 @@ private DeleteReachableFiles.Result deleteFiles(Iterator files) { } else { if (deleteFunc == null) { - LOG.info("Table IO {} does not support bulk operations. Using non-bulk deletes.", io); + LOG.info( + "Table IO {} does not support bulk operations. Using non-bulk deletes.", + io.getClass().getName()); summary = deleteFiles(deleteExecutorService, io::deleteFile, files); } else { LOG.info("Custom delete function provided. Using non-bulk deletes"); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index 714f5bfb7b7d..95e153a9a5a6 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -75,13 +75,6 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction defaultDelete = - new Consumer() { - @Override - public void accept(String file) { - ops.io().deleteFile(file); - } - }; private final Set expiredSnapshotIds = Sets.newHashSet(); private Long expireOlderThanValue = null; @@ -273,7 +266,8 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { if (deleteFunc == null) { LOG.info( - "Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io()); + "Table IO {} does not support bulk operations. Using non-bulk deletes.", + table.io().getClass().getName()); summary = deleteFiles(deleteExecutorService, table.io()::deleteFile, files); } else { LOG.info("Custom delete function provided. Using non-bulk deletes"); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 0c2f1b55211c..8d979ea05460 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -125,7 +125,7 @@ public InternalRow[] call(InternalRow args) { + "table is currently using {} which supports bulk deletes so the parameter will be ignored. " + "See that IO's documentation to learn how to adjust parallelism for that particular " + "IO's bulk delete.", - table.io()); + table.io().getClass().getName()); } else { action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots")); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index b5b27284f655..6e66ea2629b8 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -164,7 +164,7 @@ public InternalRow[] call(InternalRow args) { + "table is currently using {} which supports bulk deletes so the parameter will be ignored. " + "See that IO's documentation to learn how to adjust parallelism for that particular " + "IO's bulk delete.", - table.io()); + table.io().getClass().getName()); } else { action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans"));