-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes #6588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@anuragmantri + @aokolnychyi + @rdblue - This is a bit of a big default behavior change but it's been biting a lot of our users lately and the change is relatively safe. |
|
|
||
| // Controls how many physical file deletes to execute in parallel when not otherwise specified | ||
| public static final String DELETE_PARALLELISM = "driver-delete-default-parallelism"; | ||
| public static final String DELETE_PARALLELISM_DEFAULT = "25"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With S3's request throttling around 4k requests a second this gives us a lot of overhead.
Assuming a 50ms response time
4000 max requests / Second / 20 requests per thread per second =~ 200 max concurrent requests.
Another option for this is to also incorporate the "bulk delete" apis but that would only help with S3 based filesystems.
| summary.deletedFile(path, type); | ||
| }); | ||
|
|
||
| withDefaultDeleteService(executorService, (deleteService) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This covers ExpireSnapshots and DropTable with Purge (via DeleteReachableFilesAction)
| .suppressFailureWhenFinished() | ||
| .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) | ||
| .run(deleteFunc::accept); | ||
| withDefaultDeleteService(deleteExecutorService, deleteService -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change covers RemoveOrphanFiles
anuragmantri
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this change @RussellSpitzer. It looks safe to me.
I left a minor comment and build failed with code format violations, you may want to run gradlew spotlessApply on the file.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Outdated
Show resolved
Hide resolved
|
Let me take a look soon. |
|
Talking with @aokolnychyi we decided we are going to take a slightly broader approach here. Rather than allowing each action to define it's own method of deleting and using custom executor services, we will fall back to each FileIO's bulk delete support. We will then add a basic parallel default delete to HDFS for bulk delete. After doing this we will deprecate all the "parallel delete" methods from the actions and procedures instead instructing users to use their IO specific parallelism controls. |
|
Thanks for clarifying @RussellSpitzer I think it makes a ton of sense to leave the specifics of bulk vs parallel to the FileIO abstraction. In this case, we leverage bulk delete wherever possible (S3) and then do parallel deletions for file systems which don't support bulk ops like HDFS to improve the throughput of deletions |
An issue we've run into frequently is that several Spark actions perform deletes on the driver with a default parallelism of 1. This is quite slow for S3 and painfully slow for very large tables. To fix this we change the default behavior to always be multithreaded deletes.
The default for all Spark related actions can then be changed with a SQL Conf parameter as well as within each command with their own parallelism parameters.