diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java index 15a141eb8c2c..cae88c34138d 100644 --- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java @@ -38,6 +38,15 @@ *

{@link #apply()} returns a list of the snapshots that will be removed. */ public interface ExpireSnapshots extends PendingUpdate> { + /** An enum representing possible clean up levels used in snapshot expiration. */ + enum CleanupLevel { + /** Skip all file cleanup, only remove snapshot metadata. */ + NONE, + /** Clean up only metadata files (manifests, manifest lists, statistics), retain data files. */ + METADATA_ONLY, + /** Clean up both metadata and data files (default). */ + ALL + } /** * Expires a specific {@link Snapshot} identified by id. @@ -116,9 +125,32 @@ public interface ExpireSnapshots extends PendingUpdate> { * * @param clean setting this to false will skip deleting expired manifests and files * @return this for method chaining + * @deprecated since 1.11.0, will be removed in 2.0.0; use {@link #cleanupLevel(CleanupLevel)} + * instead. */ + @Deprecated ExpireSnapshots cleanExpiredFiles(boolean clean); + /** + * Configures the cleanup level for expired files. + * + *

This method provides fine-grained control over which files are cleaned up during snapshot + * expiration. + * + *

Consider {@link CleanupLevel#METADATA_ONLY} when data files are shared across tables or when + * using procedures like add-files that may reference the same data files. + * + *

Consider {@link CleanupLevel#NONE} when data and metadata files may be more efficiently + * removed using a distributed framework through the actions API. + * + * @param level the cleanup level to use for expired snapshots + * @return this for method chaining + */ + default ExpireSnapshots cleanupLevel(CleanupLevel level) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement cleanupLevel"); + } + /** * Enable cleaning up unused metadata, such as partition specs, schemas, etc. * diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index 049bb2268ef1..b55280a6537f 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -59,7 +59,23 @@ protected FileCleanupStrategy( this.deleteFunc = deleteFunc; } - public abstract void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration); + /** + * Clean up files that are only reachable by expired snapshots. + * + *

This method is responsible for identifying and deleting files that are safe to remove based + * on the table metadata state before and after snapshot expiration. The cleanup level controls + * which types of files are eligible for deletion. + * + *

Note that {@link ExpireSnapshots.CleanupLevel#NONE} is handled before reaching this method + * + * @param beforeExpiration table metadata before snapshot expiration + * @param afterExpiration table metadata after snapshot expiration + * @param cleanupLevel controls which types of files are eligible for deletion + */ + public abstract void cleanFiles( + TableMetadata beforeExpiration, + TableMetadata afterExpiration, + ExpireSnapshots.CleanupLevel cleanupLevel); private static final Schema MANIFEST_PROJECTION = ManifestFile.schema() diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java index 6cd9b5b15938..911a94c13f28 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -48,12 +48,19 @@ class IncrementalFileCleanup extends FileCleanupStrategy { @Override @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) - public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) { - // clean up the expired snapshots: + public void cleanFiles( + TableMetadata beforeExpiration, + TableMetadata afterExpiration, + ExpireSnapshots.CleanupLevel cleanupLevel) { + // clean up required underlying files based on the expired snapshots // 1. Get a list of the snapshots that were removed // 2. Delete any data files that were deleted by those snapshots and are not in the table // 3. Delete any manifests that are no longer used by current snapshots // 4. Delete the manifest lists + if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) { + LOG.info("Nothing to clean."); + return; + } Set validIds = Sets.newHashSet(); for (Snapshot snapshot : afterExpiration.snapshots()) { @@ -251,17 +258,23 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira } }); - Set filesToDelete = - findFilesToDelete( - manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById()); + if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) { + Set filesToDelete = + findFilesToDelete( + manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById()); + LOG.debug("Deleting {} data files", filesToDelete.size()); + deleteFiles(filesToDelete, "data"); + } - deleteFiles(filesToDelete, "data"); + LOG.debug("Deleting {} manifest files", manifestsToDelete.size()); deleteFiles(manifestsToDelete, "manifest"); + LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size()); deleteFiles(manifestListsToDelete, "manifest list"); if (hasAnyStatisticsFiles(beforeExpiration)) { Set expiredStatisticsFilesLocations = expiredStatisticsFilesLocations(beforeExpiration, afterExpiration); + LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size()); deleteFiles(expiredStatisticsFilesLocations, "statistics files"); } } diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java index dd4239196996..e860c896a477 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java @@ -49,7 +49,15 @@ class ReachableFileCleanup extends FileCleanupStrategy { } @Override - public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) { + public void cleanFiles( + TableMetadata beforeExpiration, + TableMetadata afterExpiration, + ExpireSnapshots.CleanupLevel cleanupLevel) { + if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) { + LOG.info("Nothing to clean."); + return; + } + Set manifestListsToDelete = Sets.newHashSet(); Set snapshotsBeforeExpiration = Sets.newHashSet(beforeExpiration.snapshots()); @@ -72,19 +80,27 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira snapshotsAfterExpiration, deletionCandidates, currentManifests::add); if (!manifestsToDelete.isEmpty()) { - Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); - deleteFiles(dataFilesToDelete, "data"); + if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) { + Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); + LOG.debug("Deleting {} data files", dataFilesToDelete.size()); + deleteFiles(dataFilesToDelete, "data"); + } + Set manifestPathsToDelete = manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet()); + LOG.debug("Deleting {} manifest files", manifestPathsToDelete.size()); deleteFiles(manifestPathsToDelete, "manifest"); } } + LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size()); deleteFiles(manifestListsToDelete, "manifest list"); if (hasAnyStatisticsFiles(beforeExpiration)) { - deleteFiles( - expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files"); + Set expiredStatisticsFilesLocations = + expiredStatisticsFilesLocations(beforeExpiration, afterExpiration); + LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size()); + deleteFiles(expiredStatisticsFilesLocations, "statistics files"); } } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 1c94b988b4ca..8e320b3d69bd 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -69,7 +69,6 @@ class RemoveSnapshots implements ExpireSnapshots { private final Set idsToRemove = Sets.newHashSet(); private final long now; private final long defaultMaxRefAgeMs; - private boolean cleanExpiredFiles = true; private TableMetadata base; private long defaultExpireOlderThan; private int defaultMinNumSnapshots; @@ -79,6 +78,8 @@ class RemoveSnapshots implements ExpireSnapshots { private Boolean incrementalCleanup; private boolean specifiedSnapshotId = false; private boolean cleanExpiredMetadata = false; + private boolean cleanExpiredFiles = true; + private CleanupLevel cleanupLevel = CleanupLevel.ALL; RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -103,7 +104,12 @@ class RemoveSnapshots implements ExpireSnapshots { @Override public ExpireSnapshots cleanExpiredFiles(boolean clean) { + Preconditions.checkArgument( + cleanupLevel == CleanupLevel.ALL, + "Cannot set cleanExpiredFiles when cleanupLevel has already been set to: %s", + cleanupLevel); this.cleanExpiredFiles = clean; + this.cleanupLevel = clean ? CleanupLevel.ALL : CleanupLevel.NONE; return this; } @@ -167,6 +173,17 @@ public ExpireSnapshots cleanExpiredMetadata(boolean clean) { return this; } + @Override + public ExpireSnapshots cleanupLevel(CleanupLevel level) { + Preconditions.checkArgument(null != level, "Invalid cleanup level: null"); + Preconditions.checkArgument( + cleanExpiredFiles || level == CleanupLevel.NONE, + "Cannot set cleanupLevel to %s when cleanExpiredFiles was explicitly set to false", + level); + this.cleanupLevel = level; + return this; + } + @Override public List apply() { TableMetadata updated = internalApply(); @@ -350,9 +367,11 @@ public void commit() { TableMetadata updated = internalApply(); ops.commit(base, updated); }); - LOG.info("Committed snapshot changes"); + LOG.info( + "Committed snapshot changes and prepare to clean up files at level={}", + cleanupLevel.name()); - if (cleanExpiredFiles && !base.snapshots().isEmpty()) { + if (CleanupLevel.NONE != cleanupLevel && !base.snapshots().isEmpty()) { cleanExpiredSnapshots(); } } @@ -384,7 +403,7 @@ private void cleanExpiredSnapshots() { : new ReachableFileCleanup( ops.io(), deleteExecutorService, planExecutorService(), deleteFunc); - cleanupStrategy.cleanFiles(base, current); + cleanupStrategy.cleanFiles(base, current, cleanupLevel); } private void validateCleanupCanBeIncremental(TableMetadata current) { diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index a473e605e2f6..c250ff82e4f7 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -1951,6 +1951,180 @@ private RemoveSnapshots removeSnapshots(Table table) { return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup); } + @TestTemplate + public void testCleanupLevelAll() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newDelete().deleteFile(FILE_A).commit(); + Snapshot secondSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + + table + .expireSnapshots() + .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL) + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles) + .as("CleanupLevel.ALL should delete both metadata and data files") + .containsExactlyInAnyOrder( + firstSnapshot.manifestListLocation(), + firstSnapshot.allManifests(table.io()).get(0).path(), + secondSnapshot.manifestListLocation(), + secondSnapshot.allManifests(table.io()).get(0).path(), + FILE_A.location()); + } + + @TestTemplate + public void testCleanupLevelMetadataOnly() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newDelete().deleteFile(FILE_A).commit(); + Snapshot secondSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + + table + .expireSnapshots() + .cleanupLevel(ExpireSnapshots.CleanupLevel.METADATA_ONLY) + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles) + .as("CleanupLevel.METADATA_ONLY should delete only metadata files") + .containsExactlyInAnyOrder( + firstSnapshot.manifestListLocation(), + firstSnapshot.allManifests(table.io()).get(0).path(), + secondSnapshot.manifestListLocation(), + secondSnapshot.allManifests(table.io()).get(0).path()); + } + + @TestTemplate + public void testCleanupLevelNone() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newDelete().deleteFile(FILE_A).commit(); + Snapshot secondSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + + table + .expireSnapshots() + .cleanupLevel(ExpireSnapshots.CleanupLevel.NONE) + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(table.snapshot(firstSnapshot.snapshotId())) + .as("First snapshot metadata should be removed even with CleanupLevel.NONE") + .isNull(); + assertThat(table.snapshot(secondSnapshot.snapshotId())) + .as("Second snapshot metadata should be removed even with CleanupLevel.NONE") + .isNull(); + + assertThat(deletedFiles).as("CleanupLevel.NONE should not delete any files").isEmpty(); + } + + @TestTemplate + public void testCleanExpiredFilesAPI() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + waitUntilAfter(firstSnapshot.timestampMillis()); + + table.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + + // Use deprecated cleanExpiredFiles(false) - should map to CleanupLevel.NONE + table + .expireSnapshots() + .cleanExpiredFiles(false) + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles) + .as("Deprecated cleanExpiredFiles(false) should behave like CleanupLevel.NONE") + .isEmpty(); + } + + @TestTemplate + public void testCannotSetCleanExpiredFilesAndCleanupLevelTogether() { + // Setting cleanExpiredFiles after cleanupLevel should fail + assertThatThrownBy( + () -> + table + .expireSnapshots() + .cleanupLevel(ExpireSnapshots.CleanupLevel.METADATA_ONLY) + .cleanExpiredFiles(false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set cleanExpiredFiles when cleanupLevel has already been set"); + + // Setting cleanupLevel after cleanExpiredFiles should also fail + assertThatThrownBy( + () -> + table + .expireSnapshots() + .cleanExpiredFiles(false) + .cleanupLevel(ExpireSnapshots.CleanupLevel.METADATA_ONLY)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set cleanupLevel to %s when cleanExpiredFiles was explicitly set to false", + ExpireSnapshots.CleanupLevel.METADATA_ONLY); + } + + @TestTemplate + public void testCanOverrideCleanExpiredFilesWithCleanupLevel() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newDelete().deleteFile(FILE_A).commit(); + Snapshot secondSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + + // Setting cleanExpiredFiles(true) first, then overriding with cleanupLevel should work + // because cleanExpiredFiles(true) doesn't create a conflicting intention + table + .expireSnapshots() + .cleanExpiredFiles(true) // This sets cleanupLevel to ALL + .cleanupLevel( + ExpireSnapshots.CleanupLevel.METADATA_ONLY) // This should override to METADATA_ONLY + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + assertThat(deletedFiles) + .as("Should only delete metadata files when overridden to METADATA_ONLY") + .containsExactlyInAnyOrder( + firstSnapshot.manifestListLocation(), + firstSnapshot.allManifests(table.io()).get(0).path(), + secondSnapshot.manifestListLocation(), + secondSnapshot.allManifests(table.io()).get(0).path()); + + // FILE_A should NOT be deleted because cleanupLevel was overridden to METADATA_ONLY + assertThat(deletedFiles).doesNotContain(FILE_A.location()); + } + + @TestTemplate + public void testCleanupLevelNullValidation() { + assertThatThrownBy(() -> table.expireSnapshots().cleanupLevel(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid cleanup level: null"); + } + private StatisticsFile writeStatsFile( long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) throws IOException {