Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ private SparkSQLProperties() {}
public static final String PRESERVE_DATA_GROUPING =
"spark.sql.iceberg.planning.preserve-data-grouping";
public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;

// Controls how many physical file deletes to execute in parallel when not otherwise specified
public static final String DELETE_PARALLELISM = "driver-delete-default-parallelism";
public static final String DELETE_PARALLELISM_DEFAULT = "25";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With S3's request throttling around 4k requests a second this gives us a lot of overhead.
Assuming a 50ms response time
4000 max requests / Second / 20 requests per thread per second =~ 200 max concurrent requests.

Another option for this is to also incorporate the "bulk delete" apis but that would only help with S3 based filesystems.

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand Down Expand Up @@ -218,6 +220,30 @@ private Dataset<FileInfo> toFileInfoDS(List<String> paths, String type) {
return spark.createDataset(fileInfoList, FileInfo.ENCODER);
}

/**
* Uses a fixed thread pool with {@link SparkSQLProperties#DELETE_PARALLELISM} when the executorService
* passed to this function is null.
*/
protected void withDefaultDeleteService(ExecutorService executorService, Consumer<ExecutorService> func) {
ExecutorService deleteService = executorService;

boolean createdDefaultDeleteService = false;
if (deleteService == null) {
int numThreads = Integer.parseInt(
spark.conf().get(
SparkSQLProperties.DELETE_PARALLELISM,
SparkSQLProperties.DELETE_PARALLELISM_DEFAULT));
deleteService = ThreadPools.newWorkerPool(this + "-default-delete-service", numThreads);
createdDefaultDeleteService = true;
}

func.accept(deleteService);

if (createdDefaultDeleteService) {
deleteService.shutdown();
}
}

/**
* Deletes files and keeps track of how many files were removed for each file type.
*
Expand All @@ -231,24 +257,27 @@ protected DeleteSummary deleteFiles(

DeleteSummary summary = new DeleteSummary();

Tasks.foreach(files)
.retry(DELETE_NUM_RETRIES)
.stopRetryOn(NotFoundException.class)
.suppressFailureWhenFinished()
.executeWith(executorService)
.onFailure(
(fileInfo, exc) -> {
String path = fileInfo.getPath();
String type = fileInfo.getType();
LOG.warn("Delete failed for {}: {}", type, path, exc);
})
.run(
fileInfo -> {
String path = fileInfo.getPath();
String type = fileInfo.getType();
deleteFunc.accept(path);
summary.deletedFile(path, type);
});

withDefaultDeleteService(executorService, (deleteService) -> {
Copy link
Member Author

@RussellSpitzer RussellSpitzer Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers ExpireSnapshots and DropTable with Purge (via DeleteReachableFilesAction)

Tasks.foreach(files)
.retry(DELETE_NUM_RETRIES)
.stopRetryOn(NotFoundException.class)
.suppressFailureWhenFinished()
.executeWith(deleteService)
.onFailure(
(fileInfo, exc) -> {
String path = fileInfo.getPath();
String type = fileInfo.getType();
LOG.warn("Delete failed for {}: {}", type, path, exc);
})
.run(
fileInfo -> {
String path = fileInfo.getPath();
String type = fileInfo.getType();
deleteFunc.accept(path);
summary.deletedFile(path, type);
});
});

return summary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,13 @@ private DeleteOrphanFiles.Result doExecute() {
List<String> orphanFiles =
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);

Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept);
withDefaultDeleteService(deleteExecutorService, deleteService ->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change covers RemoveOrphanFiles

Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept));

return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
Expand Down