diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index c9e5f7cca785..5925d806bff3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -27,6 +27,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.iceberg.FileContent; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; @@ -36,6 +38,7 @@ import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -83,10 +86,20 @@ public void accept(String file) { } }; + private final Consumer> bulkDelete = + new Consumer>() { + @Override + public void accept(Iterable files) { + SupportsBulkOperations bulkOperations = (SupportsBulkOperations) ops.io(); + bulkOperations.deleteFiles(files); + } + }; + private final Set expiredSnapshotIds = Sets.newHashSet(); private Long expireOlderThanValue = null; private Integer retainLastValue = null; private Consumer deleteFunc = defaultDelete; + private Consumer> bulkDeleteFunc = bulkDelete; private ExecutorService deleteExecutorService = null; private Dataset expiredFiles = null; @@ -139,6 +152,14 @@ public ExpireSnapshotsSparkAction deleteWith(Consumer newDeleteFunc) { return this; } + public ExpireSnapshotsSparkAction bulkDeleteWith(Consumer> newBulkDeleteFunc) { + Preconditions.checkArgument( + ops.io() instanceof SupportsBulkOperations, + "FileIO %s does not support bulk deletion", + table.io().getClass().getName()); + this.bulkDeleteFunc = newBulkDeleteFunc; + return this; + } /** * Expires snapshots and commits the changes to the table, returning a Dataset of files to delete. * @@ -215,13 +236,71 @@ private String jobDesc() { private ExpireSnapshots.Result doExecute() { boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); - if (streamResults) { - return deleteFiles(expire().toLocalIterator()); + boolean supportBulkDeletion = ops.io() instanceof SupportsBulkOperations; + + final Dataset expiredRows = expire(); + final Iterator expiredIterator = + streamResults ? expiredRows.toLocalIterator() : expiredRows.collectAsList().iterator(); + + if (supportBulkDeletion) { + return deleteFilesInBulk(expiredRows.collectAsList()); } else { - return deleteFiles(expire().collectAsList().iterator()); + return deleteFiles(expiredIterator); } } + /** + * Bulk deletes files passed to it based on their type. It now delegates task management to fileIO + * + * @param batch a list of Spark Rows of the structure (path: String, type: String) + * @return Statistics on which files were deleted + */ + private ExpireSnapshots.Result deleteFilesInBulk(List batch) { + AtomicLong dataFileCount = new AtomicLong(0L); + AtomicLong posDeleteFileCount = new AtomicLong(0L); + AtomicLong eqDeleteFileCount = new AtomicLong(0L); + AtomicLong manifestCount = new AtomicLong(0L); + AtomicLong manifestListCount = new AtomicLong(0L); + Function fileName = r -> r.getString(0); + + List fileNamed = batch.stream().map(fileName).collect(Collectors.toList()); + bulkDeleteFunc.accept(fileNamed); + long dataFileFound = + batch.stream() + .filter(r -> FileContent.DATA.name().equalsIgnoreCase(r.getString(1))) + .count(); + dataFileCount.addAndGet(dataFileFound); + long posDeleteFileFound = + batch.stream() + .filter(r -> FileContent.POSITION_DELETES.name().equalsIgnoreCase(r.getString(1))) + .count(); + posDeleteFileCount.addAndGet(posDeleteFileFound); + long eqDeleteFileFound = + batch.stream() + .filter(r -> FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(r.getString(1))) + .count(); + eqDeleteFileCount.addAndGet(eqDeleteFileFound); + long manifestFound = + batch.stream().filter(r -> MANIFEST.equalsIgnoreCase(r.getString(1))).count(); + manifestCount.addAndGet(manifestFound); + long manifestListFound = + batch.stream().filter(r -> MANIFEST_LIST.equalsIgnoreCase(r.getString(1))).count(); + manifestListCount.addAndGet(manifestListFound); + + long contentFileCount = + dataFileCount.get() + posDeleteFileCount.get() + eqDeleteFileCount.get(); + LOG.info( + "Deleted {} total files in bulk", + contentFileCount + manifestCount.get() + manifestListCount.get()); + + return new BaseExpireSnapshotsActionResult( + dataFileCount.get(), + posDeleteFileCount.get(), + eqDeleteFileCount.get(), + manifestCount.get(), + manifestListCount.get()); + } + private Dataset buildValidFileDF(TableMetadata metadata) { Table staticTable = newStaticTable(metadata, table.io()); return buildValidContentFileWithTypeDF(staticTable) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index 6c6240a3b589..8a72a164bfe8 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -1182,6 +1182,21 @@ public void testExpireAction() { action.expire()); } + @Test + public void testExpireBulkDeleteWithThrowOnUnsupportedFileIO() { + table.newAppend().appendFile(FILE_A).commit(); + Set bulkDeletedFiles = Sets.newHashSet(); + + AssertHelpers.assertThrows( + "Should complain about unsupported fileIO", + IllegalArgumentException.class, + "does not support bulk deletion", + () -> + SparkActions.get() + .expireSnapshots(table) + .bulkDeleteWith(files -> files.forEach(bulkDeletedFiles::add))); + } + @Test public void testUseLocalIterator() { table.newFastAppend().appendFile(FILE_A).commit();