diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index dae99c572c78..6cc210f35e12 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -23,8 +23,10 @@ import java.util.function.Consumer; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -32,6 +34,14 @@ @SuppressWarnings("checkstyle:VisibilityModifier") abstract class FileCleanupStrategy { + private final Consumer defaultDeleteFunc = + new Consumer<>() { + @Override + public void accept(String file) { + fileIO.deleteFile(file); + } + }; + private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class); protected final FileIO fileIO; @@ -75,14 +85,32 @@ protected CloseableIterable readManifests(Snapshot snapshot) { } protected void deleteFiles(Set pathsToDelete, String fileType) { - Tasks.foreach(pathsToDelete) - .executeWith(deleteExecutorService) - .retry(3) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .onFailure( - (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) - .run(deleteFunc::accept); + if (deleteFunc == null && fileIO instanceof SupportsBulkOperations) { + try { + ((SupportsBulkOperations) fileIO).deleteFiles(pathsToDelete); + } catch (BulkDeletionFailureException e) { + LOG.warn( + "Bulk deletion failed for {} of {} {} file(s)", + e.numberFailedObjects(), + pathsToDelete.size(), + fileType, + e); + } catch (RuntimeException e) { + LOG.warn("Bulk deletion failed", e); + } + } else { + Consumer deleteFuncToUse = deleteFunc == null ? defaultDeleteFunc : deleteFunc; + + Tasks.foreach(pathsToDelete) + .executeWith(deleteExecutorService) + .retry(3) + .stopRetryOn(NotFoundException.class) + .stopOnFailure() + .suppressFailureWhenFinished() + .onFailure( + (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) + .run(deleteFuncToUse::accept); + } } protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 9418b0f00765..b147410b8537 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -65,14 +65,6 @@ class RemoveSnapshots implements ExpireSnapshots { private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = MoreExecutors.newDirectExecutorService(); - private final Consumer defaultDelete = - new Consumer() { - @Override - public void accept(String file) { - ops.io().deleteFile(file); - } - }; - private final TableOperations ops; private final Set idsToRemove = Sets.newHashSet(); private final long now; @@ -81,7 +73,7 @@ public void accept(String file) { private TableMetadata base; private long defaultExpireOlderThan; private int defaultMinNumSnapshots; - private Consumer deleteFunc = defaultDelete; + private Consumer deleteFunc = null; private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); private Boolean incrementalCleanup; diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index d60cba9ee204..637d0a91f02d 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.times; import java.io.File; import java.io.IOException; @@ -40,8 +41,10 @@ import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.Puffin; import org.apache.iceberg.puffin.PuffinWriter; @@ -1625,6 +1628,83 @@ public void testRetainFilesOnRetainedBranches() { assertThat(deletedFiles).isEqualTo(expectedDeletes); } + @TestTemplate + public void testRemoveFromTableWithBulkIO() { + TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO()); + + Mockito.doNothing().when(spyFileIO).deleteFiles(any()); + + runBulkDeleteTest(spyFileIO); + } + + @TestTemplate + public void testBulkDeletionWithBulkDeletionFailureException() { + TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO()); + + Mockito.doThrow(new BulkDeletionFailureException(2)) + .doNothing() + .when(spyFileIO) + .deleteFiles(any()); + + runBulkDeleteTest(spyFileIO); + } + + @TestTemplate + public void testBulkDeletionWithRuntimeException() { + TestBulkLocalFileIO spyFileIO = Mockito.spy(new TestBulkLocalFileIO()); + + Mockito.doThrow(new RuntimeException("Exception when bulk deleting")) + .doNothing() + .when(spyFileIO) + .deleteFiles(any()); + + runBulkDeleteTest(spyFileIO); + } + + private void runBulkDeleteTest(TestBulkLocalFileIO spyFileIO) { + String tableName = "tableWithBulkIO"; + Table tableWithBulkIO = + TestTables.create( + tableDir, + tableName, + SCHEMA, + SPEC, + SortOrder.unsorted(), + formatVersion, + new TestTables.TestTableOperations(tableName, tableDir, spyFileIO)); + + tableWithBulkIO.newAppend().appendFile(FILE_A).commit(); + + Set deletedManifestLists = + Sets.newHashSet(tableWithBulkIO.currentSnapshot().manifestListLocation()); + Set deletedManifests = + tableWithBulkIO.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + + tableWithBulkIO.newDelete().deleteFile(FILE_A).commit(); + + deletedManifestLists.add(tableWithBulkIO.currentSnapshot().manifestListLocation()); + deletedManifests.addAll( + tableWithBulkIO.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet())); + + tableWithBulkIO.newAppend().appendFile(FILE_B).commit(); + + long lastSnapshotId = tableWithBulkIO.currentSnapshot().snapshotId(); + + removeSnapshots(tableWithBulkIO).expireOlderThan(System.currentTimeMillis()).commit(); + + assertThat(tableWithBulkIO.currentSnapshot().snapshotId()).isEqualTo(lastSnapshotId); + assertThat(tableWithBulkIO.snapshots()).containsOnly(tableWithBulkIO.currentSnapshot()); + + Mockito.verify(spyFileIO, times(3)).deleteFiles(any()); + Mockito.verify(spyFileIO).deleteFiles(Set.of(FILE_A.location())); + Mockito.verify(spyFileIO).deleteFiles(deletedManifestLists); + Mockito.verify(spyFileIO).deleteFiles(deletedManifests); + } + @TestTemplate public void testRemoveSpecDuringExpiration() { DataFile file = @@ -1867,4 +1947,18 @@ private static PartitionStatisticsFile reusePartitionStatsFile( private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); } + + private static class TestBulkLocalFileIO extends TestTables.LocalFileIO + implements SupportsBulkOperations { + + @Override + public void deleteFile(String path) { + throw new RuntimeException("Expected to call the bulk delete interface."); + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + throw new RuntimeException("Expected to mock this function"); + } + } }