Skip to content

Conversation

@gaborkaszab
Copy link
Collaborator

The current implementation uses the deleteFile() of the FileIO even if it supports bulk operations. Even though the user of the RemoveSnapshots API can provide a custom Consumer to perform bulk deletion, Iceberg can be clever enough itself to find out if bulk deletion is possible on the FileIO.

@github-actions github-actions bot added the core label Dec 20, 2024
@gaborkaszab gaborkaszab force-pushed the main_bulk_delete_in_remove_snapshots branch from 0c875f9 to aafd0fa Compare December 20, 2024 15:57
@gaborkaszab
Copy link
Collaborator Author

@gaborkaszab gaborkaszab force-pushed the main_bulk_delete_in_remove_snapshots branch 3 times, most recently from 87083d5 to d0638e5 Compare January 13, 2025 11:10
@steveloughran
Copy link
Contributor

I'm going to suggest some tests of failure handling to see what happens there

@gaborkaszab
Copy link
Collaborator Author

I'm going to suggest some tests of failure handling to see what happens there

Hi @steveloughran ,
The expire snapshot tests doesn't exercise the differences between FileIO implementations. The main point of these tests is to verify that the desired interface is called with the desired parameters, but they use TestTableOperations with LocalFileIO.

@gaborkaszab
Copy link
Collaborator Author

Hi @amogh-jahagirdar ,
Would you mind taking a look? This PR came up with a conversation with you on Iceberg Slack. https://apache-iceberg.slack.com/archives/C03LG1D563F/p1733215233582339

@gaborkaszab
Copy link
Collaborator Author

@amogh-jahagirdar Would you mind taking a look. This came from a Slack discussion we had earlier.

cc @pvary in case you have some capacity for this.
Thanks!

* safe.
*/
public class BulkDeleteConsumer implements Consumer<String> {
private final List<String> files = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit afraid that this list could become quite big.
Could we "flush" the delete in batches?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also make for a slow delete at the end. Ideally there'd be a page size for deletes, say 1000, and then kick off the delete in a separate thread.

Both S3A and S3FileIO have a configurable page size; s3a bulk delete is also rate limited per bucket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to flushing in batches, the list of files can be quite large for snapshot expiration. I think having a constant 1000 is fine to begin with.

I don't think it's really strictly required to kick of the delete in a separate thread, and would prefer to keep it simple at least for now. We generally are performing bulk deletes in maintenance operations which are already long running and a good chunk of that time is spent in CPU/memory bound computations of which files to delete rather than actually doing deletion.

If it's a real issue I'd table that as an optimization for later.

Copy link
Collaborator Author

@gaborkaszab gaborkaszab Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just checked the CatalogUtil.deleteFiles() functionality and apparently it does the batching by gathering the delete files by manifest and do a batch delete for each manifest separately if I understand it right. Let me see if instead of implementing a batching logic here, is it feasible to do the 'per manifest' batching.

Just for the record, I've checked S3FileIO and that seems to take care of the batching of the deletes. I'll have to make more investigation on this but for me it seems that we'd do double batching without much gain, just extra code complexity.

return;
}

ops.deleteFiles(files);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do retry, error handling?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry: no, they should do it themselves. If you add a layer of retry on top of their code, you simply double wrap the failures for exponential delays before giving up.

Do not try and be clever here. Look at the S3A connector policy, recognise how complicated it is, different policies for connectivity vs throttle vs other errors, what can be retried, how long to wait/backoff, etc etc.
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java

double wrapping retries is a real PITA, it's bad enough that the V2 SDK has taken to retrying some things (UnknownHostException) that it never used to...doing stuff in the app makes things work.

regarding error handling: what is there to do other than report an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original delete path had retries. See:

Tasks.foreach(pathsToDelete)
          .executeWith(deleteExecutorService)
          .retry(3)
          .stopRetryOn(NotFoundException.class)
          .stopOnFailure()
          .suppressFailureWhenFinished()
          .onFailure(
              (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
          .run(deleteFunc::accept);

I think we should match the original behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I agree with @pvary that to begin with we should probably mimic the existing delete retry behavior. In terms of error handling the deletion is all best effort. No operation should be impeded due to failure to physically delete a file off disk.

Though I understand @steveloughran point that double wrapping retries is not good either since we're essentially retrying 3 * num_sdk_retries on every retryable failure which just keeps clients up for unnecessarily long.

I think there's a worthwhile discussion to be had though in a follow on if we want to tune these retry behaviors in it's entirety to account for clients already performing retries.

I also don't know what the other clients such as Azure/GCS do in terms of automatic retries (since we want whatever is here to generalize across other systems).

Copy link
Collaborator Author

@gaborkaszab gaborkaszab Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the observations! I know the sequential delete approach here does retries, however I also checked CatalogUtil.deleteFiles() (that is used by the dropTable() functionality) and apparently there we don't do retries.
I checked the DeleteOrphanFileSperAction too and apparently that also doesn't do retries for neither the bulk nor the sequential deletes.
Also it doesn't do bulking, it leaves it to the FileIO (relevant for the other question on this PR).

Additionally, if bulk deletion fails, it's possible that the subset of the files were deleted, however we won't know which just get a number of failures from the Exception. So in case of partial success/failure if we retry the bulk delete with the same list of paths it will fail again and again.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented; no actual code suggestions

* safe.
*/
public class BulkDeleteConsumer implements Consumer<String> {
private final List<String> files = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also make for a slow delete at the end. Ideally there'd be a page size for deletes, say 1000, and then kick off the delete in a separate thread.

Both S3A and S3FileIO have a configurable page size; s3a bulk delete is also rate limited per bucket.

return;
}

ops.deleteFiles(files);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry: no, they should do it themselves. If you add a layer of retry on top of their code, you simply double wrap the failures for exponential delays before giving up.

Do not try and be clever here. Look at the S3A connector policy, recognise how complicated it is, different policies for connectivity vs throttle vs other errors, what can be retried, how long to wait/backoff, etc etc.
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java

double wrapping retries is a real PITA, it's bad enough that the V2 SDK has taken to retrying some things (UnknownHostException) that it never used to...doing stuff in the app makes things work.

regarding error handling: what is there to do other than report an error?

@amogh-jahagirdar
Copy link
Contributor

Sorry for the late followup, I'm taking a look!

* Consumer class to collect file paths one by one and perform a bulk deletion on them. Not thread
* safe.
*/
public class BulkDeleteConsumer implements Consumer<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public? It'd be ideal if this can be package private and encapsulated in the core places where it's needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that even users can have an implementation of this so that they can plug in consumers that are capable of using the bulk delete interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving this a second thought, and after checking the DeleteOrphanFilesSparkAction code, I think I overcomplicated things here. So in DeleteOrphanFilesSparkAction the deleteFunc is also pluggable, however it keeps it simple:
If there is a plugged deleteFunc (that is a Consumer), use it
If there is no plugged deleteFunc and the fileIO supports bulk delete, then do bulk delete
otherwise do sequential delete.
See: https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java#L253
The difference is that for expire snapshots the deleteFunc won't be null, but that's a minor thing I think.

* safe.
*/
public class BulkDeleteConsumer implements Consumer<String> {
private final List<String> files = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to flushing in batches, the list of files can be quite large for snapshot expiration. I think having a constant 1000 is fine to begin with.

I don't think it's really strictly required to kick of the delete in a separate thread, and would prefer to keep it simple at least for now. We generally are performing bulk deletes in maintenance operations which are already long running and a good chunk of that time is spent in CPU/memory bound computations of which files to delete rather than actually doing deletion.

If it's a real issue I'd table that as an optimization for later.

return;
}

ops.deleteFiles(files);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I agree with @pvary that to begin with we should probably mimic the existing delete retry behavior. In terms of error handling the deletion is all best effort. No operation should be impeded due to failure to physically delete a file off disk.

Though I understand @steveloughran point that double wrapping retries is not good either since we're essentially retrying 3 * num_sdk_retries on every retryable failure which just keeps clients up for unnecessarily long.

I think there's a worthwhile discussion to be had though in a follow on if we want to tune these retry behaviors in it's entirety to account for clients already performing retries.

I also don't know what the other clients such as Azure/GCS do in terms of automatic retries (since we want whatever is here to generalize across other systems).

@gaborkaszab
Copy link
Collaborator Author

Thanks for taking a look @pvary, @amogh-jahagirdar and @steveloughran ! I'll be offline for a couple of days but will take a deeper look after that.

Another thing we discussed with Peter apart from the comments is that the current PR abuses the concept of a Consumer<String> because we now try use it as a kind of Consumer<Iterable<String>>. The thing that makes this a bit complicated is that the deleteFunc is pluggable and the API accepts a Consumer<String>. I'll consider a bit different approach where we wrap that pluggable deleteFunc into a Consumer<Iterable<String>> or similar. Let's see how that works out.

I also responded to the current comments.
Thanks again for taking a look!

@gaborkaszab gaborkaszab force-pushed the main_bulk_delete_in_remove_snapshots branch from d0638e5 to f28fb78 Compare February 28, 2025 14:55
@gaborkaszab
Copy link
Collaborator Author

Hi @amogh-jahagirdar , @pvary ,

I managed to simplify the original PR and uploaded a new version. I took a deeper look at how orphan file deletion does the same thing and I basically follow the same approach here too.
In a nutshell, I think I overcomplicated my first approach because I also wanted to implement bulk deletion for the plugged-in deleteFunc. However, for orphan files we don't do that and rather do the bulk deletion in case there is no pluggeg-in deleteFunc and the FileIO supports bulk deletion.

Additionally, there were comments about retries and batching the files. I believe these are not needed here, and this is also inline with orphan file cleanup.

Let me know what you think!

@pvary
Copy link
Contributor

pvary commented Feb 28, 2025

Why did you decide against an util method? The code is really very similar to the orphan file removal stuff

@gaborkaszab
Copy link
Collaborator Author

Why did you decide against an util method? The code is really very similar to the orphan file removal stuff

I figured that first we should agree on the approach, because last time we were talking about retries and batch deletes. So once the approach is fine we could think about refactoring this and moving code to a common util.

However, I checked the code and apparently there are multiple places where we apply a very similar logic so I thought that we might want to merge this PR without the refactor, and then as a next step I could take a look at moving all these occurrences into a common util class.
DeleteOrphanFilesSparkAction for multiple versions, CatalogUtil.deleteFiles(), SparkCleanupUtil for multiple versions, DeleteReachableFilesSparkAction for multiple versions, and there might be more. Not all of them allow a pluggable deleteFunc, but they could still use the common implementation providing null.

WDYT?

@SuppressWarnings("checkstyle:VisibilityModifier")
abstract class FileCleanupStrategy {
private final Consumer<String> defaultDeleteFunc =
new Consumer<String>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: String is not needed here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@pvary
Copy link
Contributor

pvary commented Mar 11, 2025

I'm fine with the current approach.
Could you please rebase, and address the little nit?
Thanks,
Peter


@TestTemplate
public void testRemoveFromTableWithBulkIO() {
TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this approach.
If we create our own FileIO operation, why do we use Mockio to spy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just use a TestFileIO which does the counting, and then we can forget about the Mockito in this test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's cleaner to use a Mockito spy on these functions and perform the verification steps on the spy. The custom FileIO was needed to have a test FileIO that derives from SupportsBulkOperations and has a deleteFiles() function. For sure we could also introduce a Set in our FileIO implementation that collects the paths received as param, also we could introduce a counter that counts how many time the deleteFiles() function was called, etc., and also we could separate the received paths by call of this function, but that is unnecessarily written code as Mockito already gives this for us.
See L1700-1703: we verify that deleteFiles() was called 3 times, and we could verify the given paths broken down by each call of the function. With this we could see that even in a scenario where we get an exception during deletion, we still call the deleteFiles() function for all the other file types too.

I prefer the current implementation compared to write custom code for verification, but I might miss something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use something like this then:

    SupportsBulkOperations bulkFileIO = Mockito.mock(SupportsBulkOperations.class, withSettings().extraInterfaces(FileIO.class));

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, @pvary ! I tried this out, but apparently it's not suitable for the tests to have a FileIO completely mocked, it would need at least a TestFileIO instance to be able to make operations on the test tables. In this case a very early step (tableWithBulkIO.newAppend().appendFile(FILE_A).commit();) fails if there is no FileIO instance just a mock under the table.

The current implementation uses the deleteFile() of the FileIO even if it
supports bulk operations. Even though the user of the RemoveSnapshots API can
provide a custom Consumer to perform bulk deletion, Iceberg can be clever enough
itself to find out if bulk deletion is possible on the FileIO.
@gaborkaszab gaborkaszab force-pushed the main_bulk_delete_in_remove_snapshots branch from f28fb78 to 944a32b Compare March 13, 2025 10:03
@pvary
Copy link
Contributor

pvary commented Mar 14, 2025

LGTM +1
Let's wait a bit, so if anyone would like to comment, they can.
Will merge next week, unless there are any objections in the meantime

@pvary pvary merged commit 845ef51 into apache:main Mar 21, 2025
42 checks passed
@pvary
Copy link
Contributor

pvary commented Mar 21, 2025

Merged to main.
Thanks @gaborkaszab for the PR and all of the reviewers too!

lliangyu-lin pushed a commit to lliangyu-lin/iceberg that referenced this pull request Mar 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants