diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index d2a2a48ed81b..d4e4501529f4 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -32,16 +32,17 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.*; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -411,17 +412,12 @@ private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metada TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); if (deleteAfterCommit) { - Set removedPreviousMetadataFiles = - Sets.newHashSet(base.previousFiles()); + Set removedPreviousMetadataFiles = Sets.newHashSet(base.previousFiles()); removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); - Tasks.foreach(removedPreviousMetadataFiles) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (previousMetadataFile, exc) -> - LOG.warn( - "Delete failed for previous metadata file: {}", previousMetadataFile, exc)) - .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); + Iterable deletedFiles = + Iterables.transform(removedPreviousMetadataFiles, MetadataLogEntry::file); + + FileIOUtil.bulkDelete(io(), deletedFiles).name("previous metadata file").execute(); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 38dfa0aaf3ee..c98b81c5a844 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -41,8 +41,10 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -62,12 +64,12 @@ enum TransactionType { private final TableOperations ops; private final TransactionTable transactionTable; private final TableOperations transactionOps; - private final List updates; + private final List> updates; private final Set intermediateSnapshotIds; private final Set deletedFiles = Sets.newHashSet(); // keep track of files deleted in the most recent commit private final Consumer enqueueDelete = deletedFiles::add; - private TransactionType type; + private final TransactionType type; private TableMetadata base; private TableMetadata current; private boolean hasLastOpCommitted; @@ -287,7 +289,7 @@ private void commitCreateTransaction() { .run( update -> { if (update instanceof SnapshotProducer) { - ((SnapshotProducer) update).cleanAll(); + ((SnapshotProducer) update).cleanAll(); } }); @@ -295,12 +297,8 @@ private void commitCreateTransaction() { } finally { // create table never needs to retry because the table has no previous state. because retries - // are not a - // concern, it is safe to delete all of the deleted files from individual operations - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run(ops.io()::deleteFile); + // are not a concern, it is safe to delete all the deleted files from individual operations + FileIOUtil.bulkDelete(ops().io(), deletedFiles).name("uncommitted file").execute(); } } @@ -329,7 +327,7 @@ private void commitReplaceTransaction(boolean orCreate) { } } - // because this is a replace table, it will always completely replace the table + // because this is a replacement table, it will always completely replace the table // metadata. even if it was just updated. if (base != underlyingOps.current()) { this.base = underlyingOps.current(); // just refreshed @@ -348,7 +346,7 @@ private void commitReplaceTransaction(boolean orCreate) { .run( update -> { if (update instanceof SnapshotProducer) { - ((SnapshotProducer) update).cleanAll(); + ((SnapshotProducer) update).cleanAll(); } }); @@ -356,12 +354,9 @@ private void commitReplaceTransaction(boolean orCreate) { } finally { // replace table never needs to retry because the table state is completely replaced. because - // retries are not - // a concern, it is safe to delete all of the deleted files from individual operations - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run(ops.io()::deleteFile); + // retries are not a concern, it is safe to delete all the deleted files from individual + // operations + FileIOUtil.bulkDelete(ops().io(), deletedFiles).name("uncommitted file").execute(); } } @@ -421,16 +416,10 @@ private void commitSimpleTransaction() { // is retried. Set committedFiles = committedFiles(ops, intermediateSnapshotIds); if (committedFiles != null) { - // delete all of the files that were deleted in the most recent set of operation commits - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run( - path -> { - if (!committedFiles.contains(path)) { - ops.io().deleteFile(path); - } - }); + // delete all the files that were deleted in the most recent set of operation commits + Iterable uncommittedFiles = + Iterables.filter(deletedFiles, path -> !committedFiles.contains(path)); + FileIOUtil.bulkDelete(ops().io(), uncommittedFiles).name("uncommitted file").execute(); } else { LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up"); } @@ -447,15 +436,12 @@ private void cleanUpOnCommitFailure() { .run( update -> { if (update instanceof SnapshotProducer) { - ((SnapshotProducer) update).cleanAll(); + ((SnapshotProducer) update).cleanAll(); } }); // delete all files that were cleaned up - Tasks.foreach(deletedFiles) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc)) - .run(ops.io()::deleteFile); + FileIOUtil.bulkDelete(ops().io(), deletedFiles).name("uncommitted file").execute(); } private void applyUpdates(TableOperations underlyingOps) { @@ -463,7 +449,7 @@ private void applyUpdates(TableOperations underlyingOps) { // use refreshed the metadata this.base = underlyingOps.current(); this.current = underlyingOps.current(); - for (PendingUpdate update : updates) { + for (PendingUpdate update : updates) { // re-commit each update in the chain to apply it and update current try { update.commit(); diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 1ce3f73e8f98..2a4366fb01a6 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -29,7 +30,6 @@ import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.FileIO; @@ -38,8 +38,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +92,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); - // run all of the deletes + // run all the deletes boolean gcEnabled = PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT); @@ -102,37 +102,26 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { deleteFiles(io, manifestsToDelete); } - Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path)) + FileIOUtil.bulkDelete(io, Iterables.transform(manifestsToDelete, ManifestFile::path)) + .name("manifest") .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc)) - .run(io::deleteFile); + .execute(); - Tasks.foreach(manifestListsToDelete) + FileIOUtil.bulkDelete(io, manifestListsToDelete) + .name("manifest list") .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc)) - .run(io::deleteFile); + .execute(); - Tasks.foreach( - Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file)) + FileIOUtil.bulkDelete( + io, Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file)) + .name("previous metadata file") .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (metadataFile, exc) -> - LOG.warn("Delete failed for previous metadata file: {}", metadataFile, exc)) - .run(io::deleteFile); - - Tasks.foreach(metadata.metadataFileLocation()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (metadataFile, exc) -> - LOG.warn("Delete failed for metadata file: {}", metadataFile, exc)) - .run(io::deleteFile); + .execute(); + + FileIOUtil.bulkDelete(io, metadata.metadataFileLocation()) + .name("metadata file") + .executeWith(ThreadPools.getWorkerPool()) + .execute(); } @SuppressWarnings("DangerousStringInternUsage") @@ -141,35 +130,33 @@ private static void deleteFiles(FileIO io, Set allManifests) { Map deletedFiles = new MapMaker().concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE).weakKeys().makeMap(); - Tasks.foreach(allManifests) - .noRetry() - .suppressFailureWhenFinished() - .executeWith(ThreadPools.getWorkerPool()) - .onFailure( - (item, exc) -> - LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) - .run( - manifest -> { - try (ManifestReader reader = ManifestFiles.open(manifest, io)) { - for (ManifestEntry entry : reader.entries()) { - // intern the file path because the weak key map uses identity (==) instead of - // equals - String path = entry.file().path().toString().intern(); - Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true); - if (alreadyDeleted == null || !alreadyDeleted) { - try { - io.deleteFile(path); - } catch (RuntimeException e) { - // this may happen if the map of deleted files gets cleaned up by gc - LOG.warn("Delete failed for data file: {}", path, e); - } + Iterable removedFiles = + Iterables.concat( + Iterables.transform( + allManifests, + manifest -> { + try (ManifestReader reader = ManifestFiles.open(manifest, io)) { + Iterable paths = + // intern the file path because the weak key map uses identity (==) + // instead of equals + Iterables.transform( + reader.entries(), entry -> entry.file().path().toString().intern()); + return Iterables.filter( + paths, + path -> { + Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true); + return alreadyDeleted == null || !alreadyDeleted; + }); + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to read manifest file: " + manifest.path(), e); } - } - } catch (IOException e) { - throw new RuntimeIOException( - e, "Failed to read manifest file: %s", manifest.path()); - } - }); + })); + + FileIOUtil.bulkDelete(io, removedFiles) + .name("data file") + .executeWith(ThreadPools.getWorkerPool()) + .execute(); } /** diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java index bd922082eaae..75774609a743 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java @@ -39,17 +39,13 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.StructLikeWrapper; -import org.apache.iceberg.util.TableScanUtil; -import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,11 +289,7 @@ private void replaceDataFiles( throw e; } catch (Exception e) { LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e); - Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) - .run(fileIO::deleteFile); + FileIOUtil.bulkDeleteFiles(fileIO, addedDataFiles).execute(); throw e; } } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index f6fc53bba12a..5939529d24d4 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -35,7 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.FileIOUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,12 +97,7 @@ public void commitFileGroups(Set fileGroups) { public void abortFileGroup(RewriteFileGroup fileGroup) { Preconditions.checkState( fileGroup.addedFiles() != null, "Cannot abort a fileGroup that was not rewritten"); - - Tasks.foreach(fileGroup.addedFiles()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((dataFile, exc) -> LOG.warn("Failed to delete: {}", dataFile.path(), exc)) - .run(dataFile -> table.io().deleteFile(dataFile.path().toString())); + FileIOUtil.bulkDeleteFiles(table.io(), fileGroup.addedFiles()).execute(); } public void commitOrClean(Set rewriteGroups) { diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 44936f251495..f9ec11d5bf61 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -45,9 +45,10 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.Pair; -import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -427,15 +428,13 @@ private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metada Set removedPreviousMetadataFiles = Sets.newHashSet(base.previousFiles()); removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); - Tasks.foreach(removedPreviousMetadataFiles) + Iterable removedFiles = + Iterables.transform(removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file); + + FileIOUtil.bulkDelete(io(), removedFiles) + .name("previous metadata file") .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (previousMetadataFile, exc) -> - LOG.warn( - "Delete failed for previous metadata file: {}", previousMetadataFile, exc)) - .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); + .execute(); } } diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 55d6d245aa03..0ba37facc024 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -35,11 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.StructLikeMap; -import org.apache.iceberg.util.StructProjection; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; +import org.apache.iceberg.util.*; public abstract class BaseTaskWriter implements TaskWriter { private final List completedDataFiles = Lists.newArrayList(); @@ -77,11 +73,9 @@ public void abort() throws IOException { close(); // clean up files created by this writer - Tasks.foreach(Iterables.concat(completedDataFiles, completedDeleteFiles)) + FileIOUtil.bulkDeleteFiles(io, Iterables.concat(completedDataFiles, completedDeleteFiles)) .executeWith(ThreadPools.getWorkerPool()) - .throwFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); + .execute(); } @Override diff --git a/core/src/main/java/org/apache/iceberg/util/FileIOUtil.java b/core/src/main/java/org/apache/iceberg/util/FileIOUtil.java new file mode 100644 index 000000000000..ec170864e86d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/FileIOUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.concurrent.ExecutorService; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileIOUtil { + private static final Logger LOG = LoggerFactory.getLogger(FileIOUtil.class); + + public static BulkDeleter bulkDeleteManifests(FileIO io, Iterable files) { + return bulkDelete(io, Iterables.transform(files, ManifestFile::path)); + } + + public static > BulkDeleter bulkDeleteFiles( + FileIO io, Iterable files) { + return bulkDelete(io, Iterables.transform(files, file -> file.path().toString())); + } + + public static BulkDeleter bulkDelete(FileIO io, Iterable files) { + return new BulkDeleter(io, files); + } + + public static BulkDeleter bulkDelete(FileIO io, String file) { + return new BulkDeleter(io, Sets.newHashSet(file)); + } + + public static class BulkDeleter { + private final FileIO io; + private final Iterable files; + private String name = "files"; + private ExecutorService service = null; + + private BulkDeleter(FileIO io, Iterable files) { + this.io = io; + this.files = files; + } + + public BulkDeleter name(String newName) { + this.name = newName; + return this; + } + + public BulkDeleter executeWith(ExecutorService svc) { + this.service = svc; + return this; + } + + public void execute() { + if (io instanceof SupportsBulkOperations) { + try { + SupportsBulkOperations bulkIO = (SupportsBulkOperations) io; + bulkIO.deleteFiles(files); + } catch (BulkDeletionFailureException e) { + LOG.warn("Failed to delete {} {}", e.numberFailedObjects(), name); + } catch (Exception e) { + // ignore + } + } else { + Tasks.foreach(files) + .noRetry() + .executeWith(service) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Delete failed for {}: {}", name, file, exc)) + .run(io::deleteFile); + } + } + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index c38a394c5a25..63d93df97fde 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -59,8 +59,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaRDD; @@ -678,11 +678,9 @@ public static List filterPartitions( } private static void deleteManifests(FileIO io, List manifests) { - Tasks.foreach(manifests) + FileIOUtil.bulkDeleteManifests(io, manifests) .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .run(item -> io.deleteFile(item.path())); + .execute(); } /** diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 1e0034eb3005..fa8122e1f73f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -53,8 +53,8 @@ import org.apache.iceberg.spark.SparkDataFile; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapPartitionsFunction; @@ -322,12 +322,7 @@ private void replaceManifests( } private void deleteFiles(Iterable locations) { - Tasks.foreach(locations) - .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) - .run(fileIO::deleteFile); + FileIOUtil.bulkDelete(fileIO, locations).executeWith(ThreadPools.getWorkerPool()).execute(); } private static ManifestFile writeManifest( diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 32d603d5a794..f0108272e4ab 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -61,10 +61,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.StructProjection; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; +import org.apache.iceberg.util.*; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -144,11 +141,7 @@ public DeltaBatchWrite toBatch() { } private static > void cleanFiles(FileIO io, Iterable files) { - Tasks.foreach(files) - .executeWith(ThreadPools.getWorkerPool()) - .throwFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); + FileIOUtil.bulkDeleteFiles(io, files).executeWith(ThreadPools.getWorkerPool()).execute(); } private class PositionDeltaBatchWrite implements DeltaBatchWrite { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 52e43d3484a6..5929dfd7ff57 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -68,6 +68,7 @@ import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.FileIOUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -674,11 +675,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e } private static > void deleteFiles(FileIO io, List files) { - Tasks.foreach(files) - .executeWith(ThreadPools.getWorkerPool()) - .throwFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); + FileIOUtil.bulkDeleteFiles(io, files).executeWith(ThreadPools.getWorkerPool()).execute(); } private static class UnpartitionedDataWriter implements DataWriter {