diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 5b009c65b7aa..65c90b1818e1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -47,4 +47,9 @@ private SparkSQLProperties() {} public static final String PRESERVE_DATA_GROUPING = "spark.sql.iceberg.planning.preserve-data-grouping"; public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; + + // Controls how many physical file deletes to execute in parallel when not otherwise specified + public static final String DELETE_PARALLELISM = + "spark.sql.iceberg.driver-delete-default-parallelism"; + public static final int DELETE_PARALLELISM_DEFAULT = 25; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index cdd80040fa9e..f8ba824946a1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -22,6 +22,7 @@ import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.lit; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,9 +57,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.JobGroupUtils; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.source.SerializableTableWithSize; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; @@ -69,6 +73,7 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; abstract class BaseSparkAction { @@ -218,6 +223,31 @@ private Dataset toFileInfoDS(List paths, String type) { return spark.createDataset(fileInfoList, FileInfo.ENCODER); } + /** + * Uses a fixed thread pool with {@link SparkSQLProperties#DELETE_PARALLELISM} when the + * executorService passed to this function is null. + */ + protected void withDefaultDeleteService( + ExecutorService executorService, Consumer func) { + ExecutorService deleteService = executorService; + + boolean createdDefaultDeleteService = false; + if (deleteService == null) { + int numThreads = PropertyUtil.propertyAsInt( + JavaConverters.mapAsJavaMap(spark.conf().getAll()), + SparkSQLProperties.DELETE_PARALLELISM, + SparkSQLProperties.DELETE_PARALLELISM_DEFAULT); + deleteService = ThreadPools.newWorkerPool(this + "-default-delete-service", numThreads); + createdDefaultDeleteService = true; + } + + func.accept(deleteService); + + if (createdDefaultDeleteService) { + deleteService.shutdown(); + } + } + /** * Deletes files and keeps track of how many files were removed for each file type. * @@ -231,24 +261,28 @@ protected DeleteSummary deleteFiles( DeleteSummary summary = new DeleteSummary(); - Tasks.foreach(files) - .retry(DELETE_NUM_RETRIES) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .executeWith(executorService) - .onFailure( - (fileInfo, exc) -> { - String path = fileInfo.getPath(); - String type = fileInfo.getType(); - LOG.warn("Delete failed for {}: {}", type, path, exc); - }) - .run( - fileInfo -> { - String path = fileInfo.getPath(); - String type = fileInfo.getType(); - deleteFunc.accept(path); - summary.deletedFile(path, type); - }); + withDefaultDeleteService( + executorService, + (deleteService) -> { + Tasks.foreach(files) + .retry(DELETE_NUM_RETRIES) + .stopRetryOn(NotFoundException.class) + .suppressFailureWhenFinished() + .executeWith(deleteService) + .onFailure( + (fileInfo, exc) -> { + String path = fileInfo.getPath(); + String type = fileInfo.getType(); + LOG.warn("Delete failed for {}: {}", type, path, exc); + }) + .run( + fileInfo -> { + String path = fileInfo.getPath(); + String type = fileInfo.getType(); + deleteFunc.accept(path); + summary.deletedFile(path, type); + }); + }); return summary; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 1abd2107ed7f..ac227581ba47 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -246,12 +246,15 @@ private DeleteOrphanFiles.Result doExecute() { List orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + withDefaultDeleteService( + deleteExecutorService, + deleteService -> + Tasks.foreach(orphanFiles) + .noRetry() + .executeWith(deleteService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) + .run(deleteFunc::accept)); return new BaseDeleteOrphanFilesActionResult(orphanFiles); }