Skip to content
32 changes: 32 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@
* <p>{@link #apply()} returns a list of the snapshots that will be removed.
*/
public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
/** An enum representing possible clean up levels used in snapshot expiration. */
enum CleanupLevel {
/** Skip all file cleanup, only remove snapshot metadata. */
NONE,
/** Clean up only metadata files (manifests, manifest lists, statistics), retain data files. */
METADATA_ONLY,
/** Clean up both metadata and data files (default). */
ALL
}

/**
* Expires a specific {@link Snapshot} identified by id.
Expand Down Expand Up @@ -116,9 +125,32 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
*
* @param clean setting this to false will skip deleting expired manifests and files
* @return this for method chaining
* @deprecated since 1.11.0, will be removed in 2.0.0; use {@link #cleanupLevel(CleanupLevel)}
* instead.
*/
@Deprecated
ExpireSnapshots cleanExpiredFiles(boolean clean);

/**
* Configures the cleanup level for expired files.
*
* <p>This method provides fine-grained control over which files are cleaned up during snapshot
* expiration.
*
* <p>Consider {@link CleanupLevel#METADATA_ONLY} when data files are shared across tables or when
* using procedures like add-files that may reference the same data files.
*
* <p>Consider {@link CleanupLevel#NONE} when data and metadata files may be more efficiently
* removed using a distributed framework through the actions API.
*
* @param level the cleanup level to use for expired snapshots
* @return this for method chaining
*/
default ExpireSnapshots cleanupLevel(CleanupLevel level) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement cleanupLevel");
}

/**
* Enable cleaning up unused metadata, such as partition specs, schemas, etc.
*
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,23 @@ protected FileCleanupStrategy(
this.deleteFunc = deleteFunc;
}

public abstract void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration);
/**
* Clean up files that are only reachable by expired snapshots.
*
* <p>This method is responsible for identifying and deleting files that are safe to remove based
* on the table metadata state before and after snapshot expiration. The cleanup level controls
* which types of files are eligible for deletion.
*
* <p>Note that {@link ExpireSnapshots.CleanupLevel#NONE} is handled before reaching this method
*
* @param beforeExpiration table metadata before snapshot expiration
* @param afterExpiration table metadata after snapshot expiration
* @param cleanupLevel controls which types of files are eligible for deletion
*/
public abstract void cleanFiles(
TableMetadata beforeExpiration,
TableMetadata afterExpiration,
ExpireSnapshots.CleanupLevel cleanupLevel);

private static final Schema MANIFEST_PROJECTION =
ManifestFile.schema()
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ class IncrementalFileCleanup extends FileCleanupStrategy {

@Override
@SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) {
// clean up the expired snapshots:
public void cleanFiles(
TableMetadata beforeExpiration,
TableMetadata afterExpiration,
ExpireSnapshots.CleanupLevel cleanupLevel) {
// clean up required underlying files based on the expired snapshots
// 1. Get a list of the snapshots that were removed
// 2. Delete any data files that were deleted by those snapshots and are not in the table
// 3. Delete any manifests that are no longer used by current snapshots
// 4. Delete the manifest lists
if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) {
LOG.info("Nothing to clean.");
return;
}

Set<Long> validIds = Sets.newHashSet();
for (Snapshot snapshot : afterExpiration.snapshots()) {
Expand Down Expand Up @@ -251,17 +258,23 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
}
});

Set<String> filesToDelete =
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());
if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) {
Set<String> filesToDelete =
findFilesToDelete(
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());
LOG.debug("Deleting {} data files", filesToDelete.size());
deleteFiles(filesToDelete, "data");
}

deleteFiles(filesToDelete, "data");
LOG.debug("Deleting {} manifest files", manifestsToDelete.size());
deleteFiles(manifestsToDelete, "manifest");
LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size());
deleteFiles(manifestListsToDelete, "manifest list");

if (hasAnyStatisticsFiles(beforeExpiration)) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size());
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
}
}
Expand Down
26 changes: 21 additions & 5 deletions core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ class ReachableFileCleanup extends FileCleanupStrategy {
}

@Override
public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) {
public void cleanFiles(
TableMetadata beforeExpiration,
TableMetadata afterExpiration,
ExpireSnapshots.CleanupLevel cleanupLevel) {
if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) {
LOG.info("Nothing to clean.");
return;
}

Set<String> manifestListsToDelete = Sets.newHashSet();

Set<Snapshot> snapshotsBeforeExpiration = Sets.newHashSet(beforeExpiration.snapshots());
Expand All @@ -72,19 +80,27 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
snapshotsAfterExpiration, deletionCandidates, currentManifests::add);

if (!manifestsToDelete.isEmpty()) {
Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
deleteFiles(dataFilesToDelete, "data");
if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) {
Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
LOG.debug("Deleting {} data files", dataFilesToDelete.size());
deleteFiles(dataFilesToDelete, "data");
}

Set<String> manifestPathsToDelete =
manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());
LOG.debug("Deleting {} manifest files", manifestPathsToDelete.size());
deleteFiles(manifestPathsToDelete, "manifest");
}
}

LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size());
deleteFiles(manifestListsToDelete, "manifest list");

if (hasAnyStatisticsFiles(beforeExpiration)) {
deleteFiles(
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files");
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size());
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
}
}

Expand Down
27 changes: 23 additions & 4 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class RemoveSnapshots implements ExpireSnapshots {
private final Set<Long> idsToRemove = Sets.newHashSet();
private final long now;
private final long defaultMaxRefAgeMs;
private boolean cleanExpiredFiles = true;
private TableMetadata base;
private long defaultExpireOlderThan;
private int defaultMinNumSnapshots;
Expand All @@ -79,6 +78,8 @@ class RemoveSnapshots implements ExpireSnapshots {
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;
private boolean cleanExpiredMetadata = false;
private boolean cleanExpiredFiles = true;
private CleanupLevel cleanupLevel = CleanupLevel.ALL;

RemoveSnapshots(TableOperations ops) {
this.ops = ops;
Expand All @@ -103,7 +104,12 @@ class RemoveSnapshots implements ExpireSnapshots {

@Override
public ExpireSnapshots cleanExpiredFiles(boolean clean) {
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about throwing here, why not just override whatever the cleanup mode was set to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Eduard, I added such validation mainly want to prevent double intention where one set both the cleanExpired files as well as cleanupMode, so intention is not clear and override might be risky, this could lead to some unexpected results so throw early maybe helpful.

This can be found more in my unit test named testCannotSetCleanExpiredFilesAndCleanModeTogether in https://github.com/apache/iceberg/pull/14287/files?new_files_changed=true#diff-35ed4072da58b6d638da909476780a4da8d97390bd56e41f8555b15b91499b64R2071-R2091.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems ok to throw an exception here because we don't want to users to call both setters. only one should be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the condition may not be precise/perfect. e.g., if the client called cleanupLevel(ALL) (default value), it would still allow this method to go through.

the other way is to default cleanupLevel to null. but we would need to do a bit if-else check during read to apply the value.

maybe the complexity is not worth it. I am wondering if it is simpler to just go with what @nastra suggested. just rely on API deprecation to move users away from the old API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some nuance here and value to keep:

Take what I included in the unit test for an example here, use preconditions check here prevent the case where both cleanupLevel(ExpireSnapshots.CleanupLevel.METADATA_ONLY) and cleanExpiredFiles(false) is configured on snapshot expiration, as the intention is unclear at the moment, override to either NONE or METADATA_ONLY could potentially result in undesired results.

Although unlikely, for the corner case discussed here when client called cleanupLevel(ALL) and also set the cleanExpiredFiles

  • if cleanExpiredFiles = true, then cleanupLevel ends up resolve to ALL and logic is equivalent
  • if cleanExpiredFiles = false, we allow such override to happen and end up with cleanupLevel=None and retain all files, I think it's acceptable as we are moving from most restrictive and least restrictive, and those files can be later cleaned with orphan removal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current impleemntation is reasonable so long as there's no breakages when someone upgrades to 1.11 and using the deprecated API, which it looks like there's not since the condition is based on whether someone additionally set the new API. I also prefer the approach of failing if a non-default cleanup level is set and the old one is also set because it's pretty unlikely a user intended to do that and it forces them to resolve that ambiguity that @dramaticlly mentioned.

cleanupLevel == CleanupLevel.ALL,
"Cannot set cleanExpiredFiles when cleanupLevel has already been set to: %s",
cleanupLevel);
this.cleanExpiredFiles = clean;
this.cleanupLevel = clean ? CleanupLevel.ALL : CleanupLevel.NONE;
return this;
}

Expand Down Expand Up @@ -167,6 +173,17 @@ public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
return this;
}

@Override
public ExpireSnapshots cleanupLevel(CleanupLevel level) {
Preconditions.checkArgument(null != level, "Invalid cleanup level: null");
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as mentioned earlier about throwing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unresolving this one, I do think this is a case where we should just be consistent with what's done in the other options in this API, which is just override what was previously set. e.g. we can set cleanExpiredFiles multiple times, and it'll just take the last. Any reason why this particular one should be different and throw @dramaticlly ?

Copy link
Contributor Author

@dramaticlly dramaticlly Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this mostly want to avoid ambiguous intention when user set both options together regardless of which option is chained first.

i.e both shall fail with exception to ask user for further clarification

table.expireSnapshots()
.cleanupLevel(ExpireSnapshots.CleanupLevel.METADATA_ONLY)
.cleanExpiredFiles(false)

and

table
.expireSnapshots()
.cleanExpiredFiles(false)
.cleanupLevel(ExpireSnapshots.CleanupLevel.METADATA_ONLY)

Updated preconditions to better assertion condition and message

cleanExpiredFiles || level == CleanupLevel.NONE,
"Cannot set cleanupLevel to %s when cleanExpiredFiles was explicitly set to false",
level);
this.cleanupLevel = level;
return this;
}

@Override
public List<Snapshot> apply() {
TableMetadata updated = internalApply();
Expand Down Expand Up @@ -350,9 +367,11 @@ public void commit() {
TableMetadata updated = internalApply();
ops.commit(base, updated);
});
LOG.info("Committed snapshot changes");
LOG.info(
"Committed snapshot changes and prepare to clean up files at level={}",
cleanupLevel.name());

if (cleanExpiredFiles && !base.snapshots().isEmpty()) {
if (CleanupLevel.NONE != cleanupLevel && !base.snapshots().isEmpty()) {
cleanExpiredSnapshots();
}
}
Expand Down Expand Up @@ -384,7 +403,7 @@ private void cleanExpiredSnapshots() {
: new ReachableFileCleanup(
ops.io(), deleteExecutorService, planExecutorService(), deleteFunc);

cleanupStrategy.cleanFiles(base, current);
cleanupStrategy.cleanFiles(base, current, cleanupLevel);
}

private void validateCleanupCanBeIncremental(TableMetadata current) {
Expand Down
Loading