From b3e3a475b930966cedbc3604bfeedbf7babe6f45 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Mon, 29 Aug 2022 13:49:04 -0700 Subject: [PATCH] Core: Expire Snapshots reachability analysis --- .../apache/iceberg/FileCleanupStrategy.java | 82 +++++ .../iceberg/IncrementalFileCleanup.java | 327 +++++++++++++++++ .../apache/iceberg/ReachableFileCleanup.java | 164 +++++++++ .../org/apache/iceberg/RemoveSnapshots.java | 335 +----------------- .../apache/iceberg/TestRemoveSnapshots.java | 122 +++---- 5 files changed, 649 insertions(+), 381 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java create mode 100644 core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java create mode 100644 core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java new file mode 100644 index 000000000000..c44acd87c12e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("checkstyle:VisibilityModifier") +abstract class FileCleanupStrategy { + private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class); + + protected final FileIO fileIO; + protected final ExecutorService planExecutorService; + private final Consumer deleteFunc; + private final ExecutorService deleteExecutorService; + + protected FileCleanupStrategy( + FileIO fileIO, + ExecutorService deleteExecutorService, + ExecutorService planExecutorService, + Consumer deleteFunc) { + this.fileIO = fileIO; + this.deleteExecutorService = deleteExecutorService; + this.planExecutorService = planExecutorService; + this.deleteFunc = deleteFunc; + } + + public abstract void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration); + + private static final Schema MANIFEST_PROJECTION = + ManifestFile.schema() + .select( + "manifest_path", "manifest_length", "added_snapshot_id", "deleted_data_files_count"); + + protected CloseableIterable readManifestFiles(Snapshot snapshot) { + if (snapshot.manifestListLocation() != null) { + return Avro.read(fileIO.newInputFile(snapshot.manifestListLocation())) + .rename("manifest_file", GenericManifestFile.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .project(MANIFEST_PROJECTION) + .reuseContainers(true) + .build(); + } else { + return CloseableIterable.withNoopClose(snapshot.allManifests(fileIO)); + } + } + + protected void deleteFiles(Set pathsToDelete, String fileType) { + Tasks.foreach(pathsToDelete) + .executeWith(deleteExecutorService) + .retry(3) + .stopRetryOn(NotFoundException.class) + .suppressFailureWhenFinished() + .onFailure( + (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) + .run(deleteFunc::accept); + } +} diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java new file mode 100644 index 000000000000..3a97356b705a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class IncrementalFileCleanup extends FileCleanupStrategy { + private static final Logger LOG = LoggerFactory.getLogger(IncrementalFileCleanup.class); + + IncrementalFileCleanup( + FileIO fileIO, + ExecutorService deleteExecutorService, + ExecutorService planExecutorService, + Consumer deleteFunc) { + super(fileIO, deleteExecutorService, planExecutorService, deleteFunc); + } + + @Override + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) + public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) { + if (afterExpiration.refs().size() > 1) { + throw new UnsupportedOperationException( + "Cannot incrementally clean files for tables with more than 1 ref"); + } + + // clean up the expired snapshots: + // 1. Get a list of the snapshots that were removed + // 2. Delete any data files that were deleted by those snapshots and are not in the table + // 3. Delete any manifests that are no longer used by current snapshots + // 4. Delete the manifest lists + + Set validIds = Sets.newHashSet(); + for (Snapshot snapshot : afterExpiration.snapshots()) { + validIds.add(snapshot.snapshotId()); + } + + Set expiredIds = Sets.newHashSet(); + for (Snapshot snapshot : beforeExpiration.snapshots()) { + long snapshotId = snapshot.snapshotId(); + if (!validIds.contains(snapshotId)) { + // the snapshot was expired + LOG.info("Expired snapshot: {}", snapshot); + expiredIds.add(snapshotId); + } + } + + if (expiredIds.isEmpty()) { + // if no snapshots were expired, skip cleanup + return; + } + + SnapshotRef branchToCleanup = Iterables.getFirst(beforeExpiration.refs().values(), null); + if (branchToCleanup == null) { + return; + } + + Snapshot latest = beforeExpiration.snapshot(branchToCleanup.snapshotId()); + List snapshots = afterExpiration.snapshots(); + + // this is the set of ancestors of the current table state. when removing snapshots, this must + // only remove files that were deleted in an ancestor of the current table state to avoid + // physically deleting files that were logically deleted in a commit that was rolled back. + Set ancestorIds = + Sets.newHashSet(SnapshotUtil.ancestorIds(latest, beforeExpiration::snapshot)); + + Set pickedAncestorSnapshotIds = Sets.newHashSet(); + for (long snapshotId : ancestorIds) { + String sourceSnapshotId = + beforeExpiration + .snapshot(snapshotId) + .summary() + .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP); + if (sourceSnapshotId != null) { + // protect any snapshot that was cherry-picked into the current table state + pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId)); + } + } + + // find manifests to clean up that are still referenced by a valid snapshot, but written by an + // expired snapshot + Set validManifests = Sets.newHashSet(); + Set manifestsToScan = Sets.newHashSet(); + + // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete + // as much of the delete work as possible and avoid orphaned data or manifest files. + Tasks.foreach(snapshots) + .retry(3) + .suppressFailureWhenFinished() + .onFailure( + (snapshot, exc) -> + LOG.warn( + "Failed on snapshot {} while reading manifest list: {}", + snapshot.snapshotId(), + snapshot.manifestListLocation(), + exc)) + .run( + snapshot -> { + try (CloseableIterable manifests = readManifestFiles(snapshot)) { + for (ManifestFile manifest : manifests) { + validManifests.add(manifest.path()); + + long snapshotId = manifest.snapshotId(); + // whether the manifest was created by a valid snapshot (true) or an expired + // snapshot (false) + boolean fromValidSnapshots = validIds.contains(snapshotId); + // whether the snapshot that created the manifest was an ancestor of the table + // state + boolean isFromAncestor = ancestorIds.contains(snapshotId); + // whether the changes in this snapshot have been picked into the current table + // state + boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId); + // if the snapshot that wrote this manifest is no longer valid (has expired), + // then delete its deleted files. note that this is only for expired snapshots + // that are in the + // current table state + if (!fromValidSnapshots + && (isFromAncestor || isPicked) + && manifest.hasDeletedFiles()) { + manifestsToScan.add(manifest.copy()); + } + } + + } catch (IOException e) { + throw new RuntimeIOException( + e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); + } + }); + + // find manifests to clean up that were only referenced by snapshots that have expired + Set manifestListsToDelete = Sets.newHashSet(); + Set manifestsToDelete = Sets.newHashSet(); + Set manifestsToRevert = Sets.newHashSet(); + Tasks.foreach(beforeExpiration.snapshots()) + .retry(3) + .suppressFailureWhenFinished() + .onFailure( + (snapshot, exc) -> + LOG.warn( + "Failed on snapshot {} while reading manifest list: {}", + snapshot.snapshotId(), + snapshot.manifestListLocation(), + exc)) + .run( + snapshot -> { + long snapshotId = snapshot.snapshotId(); + if (!validIds.contains(snapshotId)) { + // determine whether the changes in this snapshot are in the current table state + if (pickedAncestorSnapshotIds.contains(snapshotId)) { + // this snapshot was cherry-picked into the current table state, so skip cleaning + // it up. + // its changes will expire when the picked snapshot expires. + // A -- C -- D (source=B) + // `- B <-- this commit + return; + } + + long sourceSnapshotId = + PropertyUtil.propertyAsLong( + snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1); + if (ancestorIds.contains(sourceSnapshotId)) { + // this commit was cherry-picked from a commit that is in the current table state. + // do not clean up its changes because it would revert data file additions that + // are in the current + // table. + // A -- B -- C + // `- D (source=B) <-- this commit + return; + } + + if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) { + // this commit was cherry-picked from a commit that is in the current table state. + // do not clean up its changes because it would revert data file additions that + // are in the current + // table. + // A -- C -- E (source=B) + // `- B `- D (source=B) <-- this commit + return; + } + + // find any manifests that are no longer needed + try (CloseableIterable manifests = readManifestFiles(snapshot)) { + for (ManifestFile manifest : manifests) { + if (!validManifests.contains(manifest.path())) { + manifestsToDelete.add(manifest.path()); + + boolean isFromAncestor = ancestorIds.contains(manifest.snapshotId()); + boolean isFromExpiringSnapshot = expiredIds.contains(manifest.snapshotId()); + + if (isFromAncestor && manifest.hasDeletedFiles()) { + // Only delete data files that were deleted in by an expired snapshot if + // that napshot is an ancestor of the current table state. Otherwise, a + // snapshot + // that deleted files and was rolled back will delete files that could be in + // the current + // table state. + manifestsToScan.add(manifest.copy()); + } + + if (!isFromAncestor && isFromExpiringSnapshot && manifest.hasAddedFiles()) { + // Because the manifest was written by a snapshot that is not an ancestor of + // the current table state, the files added in this manifest can be removed. + // The + // extra check whether the manifest was written by a known snapshot that was + // expired in this commit ensures that the full ancestor list between when + // the snapshot + // was written and this expiration is known and there is no missing history. + // If + // history were missing, then the snapshot could be an ancestor of the table + // state + // but the ancestor ID set would not contain it and this would be unsafe. + manifestsToRevert.add(manifest.copy()); + } + } + } + } catch (IOException e) { + throw new RuntimeIOException( + e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); + } + + // add the manifest list to the delete set, if present + if (snapshot.manifestListLocation() != null) { + manifestListsToDelete.add(snapshot.manifestListLocation()); + } + } + }); + + Set filesToDelete = + findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); + + deleteFiles(filesToDelete, "data"); + LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); + LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete)); + deleteFiles(manifestsToDelete, "manifest"); + deleteFiles(manifestListsToDelete, "manifest list"); + } + + private Set findFilesToDelete( + Set manifestsToScan, + Set manifestsToRevert, + Set validIds, + TableMetadata current) { + Set filesToDelete = ConcurrentHashMap.newKeySet(); + Tasks.foreach(manifestsToScan) + .retry(3) + .suppressFailureWhenFinished() + .executeWith(planExecutorService) + .onFailure( + (item, exc) -> + LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) + .run( + manifest -> { + // the manifest has deletes, scan it to find files to delete + try (ManifestReader reader = + ManifestFiles.open(manifest, fileIO, current.specsById())) { + for (ManifestEntry entry : reader.entries()) { + // if the snapshot ID of the DELETE entry is no longer valid, the data can be + // deleted + if (entry.status() == ManifestEntry.Status.DELETED + && !validIds.contains(entry.snapshotId())) { + // use toString to ensure the path will not change (Utf8 is reused) + filesToDelete.add(entry.file().path().toString()); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); + } + }); + + Tasks.foreach(manifestsToRevert) + .retry(3) + .suppressFailureWhenFinished() + .executeWith(planExecutorService) + .onFailure( + (item, exc) -> + LOG.warn("Failed to get added files: this may cause orphaned data files", exc)) + .run( + manifest -> { + // the manifest has deletes, scan it to find files to delete + try (ManifestReader reader = + ManifestFiles.open(manifest, fileIO, current.specsById())) { + for (ManifestEntry entry : reader.entries()) { + // delete any ADDED file from manifests that were reverted + if (entry.status() == ManifestEntry.Status.ADDED) { + // use toString to ensure the path will not change (Utf8 is reused) + filesToDelete.add(entry.file().path().toString()); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); + } + }); + + return filesToDelete; + } +} diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java new file mode 100644 index 000000000000..75b6de6659b0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * File cleanup strategy for snapshot expiration which determines, via an in-memory reference set, + * metadata and data files that are not reachable given the previous and current table states. + */ +class ReachableFileCleanup extends FileCleanupStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(ReachableFileCleanup.class); + + ReachableFileCleanup( + FileIO fileIO, + ExecutorService deleteExecutorService, + ExecutorService planExecutorService, + Consumer deleteFunc) { + super(fileIO, deleteExecutorService, planExecutorService, deleteFunc); + } + + @Override + public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpiration) { + Set manifestListsToDelete = Sets.newHashSet(); + + Set snapshotsBeforeExpiration = Sets.newHashSet(beforeExpiration.snapshots()); + Set snapshotsAfterExpiration = Sets.newHashSet(afterExpiration.snapshots()); + Set expiredSnapshots = Sets.newHashSet(); + for (Snapshot snapshot : snapshotsBeforeExpiration) { + if (!snapshotsAfterExpiration.contains(snapshot)) { + expiredSnapshots.add(snapshot); + if (snapshot.manifestListLocation() != null) { + manifestListsToDelete.add(snapshot.manifestListLocation()); + } + } + } + + Set candidateManifestFilesForDeletion = readManifests(expiredSnapshots); + Set manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration); + + Set manifestsToDelete = Sets.newHashSet(); + for (ManifestFile candidateManifestFile : candidateManifestFilesForDeletion) { + if (!manifestFilesAfterExpiration.contains(candidateManifestFile)) { + manifestsToDelete.add(candidateManifestFile); + } + } + + Set dataFilesToDelete = + findFilesToDelete(manifestsToDelete, manifestFilesAfterExpiration); + deleteFiles(dataFilesToDelete, "data"); + Set manifestPathsToDelete = + manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet()); + + deleteFiles(manifestPathsToDelete, "manifest"); + deleteFiles(manifestListsToDelete, "manifest list"); + } + + private Set readManifests(Set snapshots) { + Set manifestFiles = Sets.newHashSet(); + for (Snapshot snapshot : snapshots) { + try (CloseableIterable manifestFilesForSnapshot = readManifestFiles(snapshot)) { + for (ManifestFile manifestFile : manifestFilesForSnapshot) { + manifestFiles.add(manifestFile.copy()); + } + } catch (IOException e) { + throw new RuntimeIOException( + e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); + } + } + + return manifestFiles; + } + + // Helper to determine data files to delete + private Set findFilesToDelete( + Set manifestFilesToDelete, Set currentManifestFiles) { + Set filesToDelete = ConcurrentHashMap.newKeySet(); + + Tasks.foreach(manifestFilesToDelete) + .retry(3) + .suppressFailureWhenFinished() + .executeWith(planExecutorService) + .onFailure( + (item, exc) -> + LOG.warn( + "Failed to determine live files in manifest {}: this may cause orphaned data files", + item.path(), + exc)) + .run( + manifest -> { + try (CloseableIterable paths = ManifestFiles.readPaths(manifest, fileIO)) { + paths.forEach(filesToDelete::add); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); + } + }); + + if (filesToDelete.isEmpty()) { + return filesToDelete; + } + + try { + Tasks.foreach(currentManifestFiles) + .retry(3) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutorService) + .onFailure( + (item, exc) -> + LOG.warn( + "Failed to determine live files in manifest {}: this may cause orphaned data files", + item.path(), + exc)) + .run( + manifest -> { + if (filesToDelete.isEmpty()) { + return; + } + + // Remove all the live files from the candidate deletion set + try (CloseableIterable paths = ManifestFiles.readPaths(manifest, fileIO)) { + paths.forEach(filesToDelete::remove); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); + } + }); + + } catch (Throwable e) { + LOG.warn("Failed to determine the data files to be removed", e); + return Sets.newHashSet(); + } + + return filesToDelete; + } +} diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index b996822aaf03..68c5a2b4e3f4 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -35,21 +35,14 @@ import static org.apache.iceberg.TableProperties.MIN_SNAPSHOTS_TO_KEEP; import static org.apache.iceberg.TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT; -import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -90,6 +83,7 @@ public void accept(String file) { private Consumer deleteFunc = defaultDelete; private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; private ExecutorService planExecutorService = ThreadPools.getWorkerPool(); + private Boolean incrementalCleanup; RemoveSnapshots(TableOperations ops) { this.ops = ops; @@ -310,11 +304,6 @@ public void commit() { .run( item -> { TableMetadata updated = internalApply(); - if (cleanExpiredFiles && updated.refs().size() > 1) { - throw new UnsupportedOperationException( - "Cannot incrementally clean files for tables with more than 1 ref"); - } - ops.commit(base, updated); }); LOG.info("Committed snapshot changes"); @@ -326,319 +315,25 @@ public void commit() { } } - private void cleanExpiredSnapshots() { - // clean up the expired snapshots: - // 1. Get a list of the snapshots that were removed - // 2. Delete any data files that were deleted by those snapshots and are not in the table - // 3. Delete any manifests that are no longer used by current snapshots - // 4. Delete the manifest lists + ExpireSnapshots withIncrementalCleanup(boolean useIncrementalCleanup) { + this.incrementalCleanup = useIncrementalCleanup; + return this; + } + private void cleanExpiredSnapshots() { TableMetadata current = ops.refresh(); - Set validIds = Sets.newHashSet(); - for (Snapshot snapshot : current.snapshots()) { - validIds.add(snapshot.snapshotId()); - } - - Set expiredIds = Sets.newHashSet(); - for (Snapshot snapshot : base.snapshots()) { - long snapshotId = snapshot.snapshotId(); - if (!validIds.contains(snapshotId)) { - // the snapshot was expired - LOG.info("Expired snapshot: {}", snapshot); - expiredIds.add(snapshotId); - } - } - - if (expiredIds.isEmpty()) { - // if no snapshots were expired, skip cleanup - return; - } - - LOG.info("Committed snapshot changes; cleaning up expired manifests and data files."); - - removeExpiredFiles(current.snapshots(), validIds, expiredIds); - } - - @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) - private void removeExpiredFiles( - List snapshots, Set validIds, Set expiredIds) { - // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete - // as much of the delete work as possible and avoid orphaned data or manifest files. - - // this is the set of ancestors of the current table state. when removing snapshots, this must - // only remove files that were deleted in an ancestor of the current table state to avoid - // physically deleting files that were logically deleted in a commit that was rolled back. - Set ancestorIds = - Sets.newHashSet(SnapshotUtil.ancestorIds(base.currentSnapshot(), base::snapshot)); - - Set pickedAncestorSnapshotIds = Sets.newHashSet(); - for (long snapshotId : ancestorIds) { - String sourceSnapshotId = - base.snapshot(snapshotId).summary().get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP); - if (sourceSnapshotId != null) { - // protect any snapshot that was cherry-picked into the current table state - pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId)); - } + if (incrementalCleanup == null) { + incrementalCleanup = current.refs().size() == 1; } - // find manifests to clean up that are still referenced by a valid snapshot, but written by an - // expired snapshot - Set validManifests = Sets.newHashSet(); - Set manifestsToScan = Sets.newHashSet(); - Tasks.foreach(snapshots) - .retry(3) - .suppressFailureWhenFinished() - .onFailure( - (snapshot, exc) -> - LOG.warn( - "Failed on snapshot {} while reading manifest list: {}", - snapshot.snapshotId(), - snapshot.manifestListLocation(), - exc)) - .run( - snapshot -> { - try (CloseableIterable manifests = readManifestFiles(snapshot)) { - for (ManifestFile manifest : manifests) { - validManifests.add(manifest.path()); - - long snapshotId = manifest.snapshotId(); - // whether the manifest was created by a valid snapshot (true) or an expired - // snapshot (false) - boolean fromValidSnapshots = validIds.contains(snapshotId); - // whether the snapshot that created the manifest was an ancestor of the table - // state - boolean isFromAncestor = ancestorIds.contains(snapshotId); - // whether the changes in this snapshot have been picked into the current table - // state - boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId); - // if the snapshot that wrote this manifest is no longer valid (has expired), - // then delete its deleted files. note that this is only for expired snapshots - // that are in the - // current table state - if (!fromValidSnapshots - && (isFromAncestor || isPicked) - && manifest.hasDeletedFiles()) { - manifestsToScan.add(manifest.copy()); - } - } - - } catch (IOException e) { - throw new RuntimeIOException( - e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); - } - }); + FileCleanupStrategy cleanupStrategy = + incrementalCleanup + ? new IncrementalFileCleanup( + ops.io(), deleteExecutorService, planExecutorService, deleteFunc) + : new ReachableFileCleanup( + ops.io(), deleteExecutorService, planExecutorService, deleteFunc); - // find manifests to clean up that were only referenced by snapshots that have expired - Set manifestListsToDelete = Sets.newHashSet(); - Set manifestsToDelete = Sets.newHashSet(); - Set manifestsToRevert = Sets.newHashSet(); - Tasks.foreach(base.snapshots()) - .retry(3) - .suppressFailureWhenFinished() - .onFailure( - (snapshot, exc) -> - LOG.warn( - "Failed on snapshot {} while reading manifest list: {}", - snapshot.snapshotId(), - snapshot.manifestListLocation(), - exc)) - .run( - snapshot -> { - long snapshotId = snapshot.snapshotId(); - if (!validIds.contains(snapshotId)) { - // determine whether the changes in this snapshot are in the current table state - if (pickedAncestorSnapshotIds.contains(snapshotId)) { - // this snapshot was cherry-picked into the current table state, so skip cleaning - // it up. - // its changes will expire when the picked snapshot expires. - // A -- C -- D (source=B) - // `- B <-- this commit - return; - } - - long sourceSnapshotId = - PropertyUtil.propertyAsLong( - snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1); - if (ancestorIds.contains(sourceSnapshotId)) { - // this commit was cherry-picked from a commit that is in the current table state. - // do not clean up its - // changes because it would revert data file additions that are in the current - // table. - // A -- B -- C - // `- D (source=B) <-- this commit - return; - } - - if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) { - // this commit was cherry-picked from a commit that is in the current table state. - // do not clean up its - // changes because it would revert data file additions that are in the current - // table. - // A -- C -- E (source=B) - // `- B `- D (source=B) <-- this commit - return; - } - - // find any manifests that are no longer needed - try (CloseableIterable manifests = readManifestFiles(snapshot)) { - for (ManifestFile manifest : manifests) { - if (!validManifests.contains(manifest.path())) { - manifestsToDelete.add(manifest.path()); - - boolean isFromAncestor = ancestorIds.contains(manifest.snapshotId()); - boolean isFromExpiringSnapshot = expiredIds.contains(manifest.snapshotId()); - - if (isFromAncestor && manifest.hasDeletedFiles()) { - // Only delete data files that were deleted in by an expired snapshot if - // that - // snapshot is an ancestor of the current table state. Otherwise, a snapshot - // that - // deleted files and was rolled back will delete files that could be in the - // current - // table state. - manifestsToScan.add(manifest.copy()); - } - - if (!isFromAncestor && isFromExpiringSnapshot && manifest.hasAddedFiles()) { - // Because the manifest was written by a snapshot that is not an ancestor of - // the - // current table state, the files added in this manifest can be removed. The - // extra - // check whether the manifest was written by a known snapshot that was - // expired in - // this commit ensures that the full ancestor list between when the snapshot - // was - // written and this expiration is known and there is no missing history. If - // history - // were missing, then the snapshot could be an ancestor of the table state - // but the - // ancestor ID set would not contain it and this would be unsafe. - manifestsToRevert.add(manifest.copy()); - } - } - } - } catch (IOException e) { - throw new RuntimeIOException( - e, "Failed to close manifest list: %s", snapshot.manifestListLocation()); - } - - // add the manifest list to the delete set, if present - if (snapshot.manifestListLocation() != null) { - manifestListsToDelete.add(snapshot.manifestListLocation()); - } - } - }); - deleteDataFiles(manifestsToScan, manifestsToRevert, validIds); - deleteMetadataFiles(manifestsToDelete, manifestListsToDelete); - } - - private void deleteMetadataFiles( - Set manifestsToDelete, Set manifestListsToDelete) { - LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); - LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete)); - - Tasks.foreach(manifestsToDelete) - .executeWith(deleteExecutorService) - .retry(3) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc)) - .run(deleteFunc::accept); - - Tasks.foreach(manifestListsToDelete) - .executeWith(deleteExecutorService) - .retry(3) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc)) - .run(deleteFunc::accept); - } - - private void deleteDataFiles( - Set manifestsToScan, Set manifestsToRevert, Set validIds) { - Set filesToDelete = findFilesToDelete(manifestsToScan, manifestsToRevert, validIds); - Tasks.foreach(filesToDelete) - .executeWith(deleteExecutorService) - .retry(3) - .stopRetryOn(NotFoundException.class) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Delete failed for data file: {}", file, exc)) - .run(file -> deleteFunc.accept(file)); - } - - private Set findFilesToDelete( - Set manifestsToScan, Set manifestsToRevert, Set validIds) { - Set filesToDelete = ConcurrentHashMap.newKeySet(); - Tasks.foreach(manifestsToScan) - .retry(3) - .suppressFailureWhenFinished() - .executeWith(planExecutorService) - .onFailure( - (item, exc) -> - LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) - .run( - manifest -> { - // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = - ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) { - for (ManifestEntry entry : reader.entries()) { - // if the snapshot ID of the DELETE entry is no longer valid, the data can be - // deleted - if (entry.status() == ManifestEntry.Status.DELETED - && !validIds.contains(entry.snapshotId())) { - // use toString to ensure the path will not change (Utf8 is reused) - filesToDelete.add(entry.file().path().toString()); - } - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); - } - }); - - Tasks.foreach(manifestsToRevert) - .retry(3) - .suppressFailureWhenFinished() - .executeWith(planExecutorService) - .onFailure( - (item, exc) -> - LOG.warn("Failed to get added files: this may cause orphaned data files", exc)) - .run( - manifest -> { - // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = - ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) { - for (ManifestEntry entry : reader.entries()) { - // delete any ADDED file from manifests that were reverted - if (entry.status() == ManifestEntry.Status.ADDED) { - // use toString to ensure the path will not change (Utf8 is reused) - filesToDelete.add(entry.file().path().toString()); - } - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); - } - }); - - return filesToDelete; - } - - private static final Schema MANIFEST_PROJECTION = - ManifestFile.schema() - .select( - "manifest_path", "manifest_length", "added_snapshot_id", "deleted_data_files_count"); - - private CloseableIterable readManifestFiles(Snapshot snapshot) { - if (snapshot.manifestListLocation() != null) { - return Avro.read(ops.io().newInputFile(snapshot.manifestListLocation())) - .rename("manifest_file", GenericManifestFile.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .project(MANIFEST_PROJECTION) - .reuseContainers(true) - .build(); - - } else { - return CloseableIterable.withNoopClose(snapshot.allManifests(ops.io())); - } + cleanupStrategy.cleanFiles(base, current); } } diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index 8174ed16285a..e8fb5e265813 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -39,13 +39,21 @@ @RunWith(Parameterized.class) public class TestRemoveSnapshots extends TableTestBase { - @Parameterized.Parameters(name = "formatVersion = {0}") + private final boolean incrementalCleanup; + + @Parameterized.Parameters(name = "formatVersion = {0}, incrementalCleanup = {1}") public static Object[] parameters() { - return new Object[] {1, 2}; + return new Object[][] { + new Object[] {1, true}, + new Object[] {2, true}, + new Object[] {1, false}, + new Object[] {2, false} + }; } - public TestRemoveSnapshots(int formatVersion) { + public TestRemoveSnapshots(int formatVersion, boolean incrementalCleanup) { super(formatVersion); + this.incrementalCleanup = incrementalCleanup; } private long waitUntilAfter(long timestampMillis) { @@ -72,7 +80,7 @@ public void testExpireOlderThan() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Expire should not change current snapshot", @@ -114,7 +122,7 @@ public void testExpireOlderThanWithDelete() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Expire should not change current snapshot", @@ -180,7 +188,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Expire should not change current snapshot", @@ -235,7 +243,7 @@ public void testExpireOlderThanWithRollback() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Expire should not change current snapshot", @@ -283,7 +291,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Expire should not change current snapshot", @@ -339,7 +347,7 @@ public void testRetainLastWithExpireOlderThan() { } // Retain last 2 snapshots - table.expireSnapshots().expireOlderThan(t3).retainLast(2).commit(); + removeSnapshots(table).expireOlderThan(t3).retainLast(2).commit(); Assert.assertEquals( "Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); @@ -381,7 +389,7 @@ public void testRetainLastWithExpireById() { } // Retain last 3 snapshots, but explicitly remove the first snapshot - table.expireSnapshots().expireSnapshotId(firstSnapshotId).retainLast(3).commit(); + removeSnapshots(table).expireSnapshotId(firstSnapshotId).retainLast(3).commit(); Assert.assertEquals( "Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); @@ -424,7 +432,7 @@ public void testRetainNAvailableSnapshotsWithTransaction() { // Retain last 2 snapshots Transaction tx = table.newTransaction(); - tx.expireSnapshots().expireOlderThan(t3).retainLast(2).commit(); + removeSnapshots(tx.table()).expireOlderThan(t3).retainLast(2).commit(); tx.commitTransaction(); Assert.assertEquals( @@ -459,7 +467,7 @@ public void testRetainLastWithTooFewSnapshots() { } // Retain last 3 snapshots - table.expireSnapshots().expireOlderThan(t2).retainLast(3).commit(); + removeSnapshots(table).expireOlderThan(t2).retainLast(3).commit(); Assert.assertEquals( "Should have two snapshots", 2, Lists.newArrayList(table.snapshots()).size()); @@ -504,7 +512,7 @@ public void testRetainNLargerThanCurrentSnapshots() { // Retain last 4 snapshots Transaction tx = table.newTransaction(); - tx.expireSnapshots().expireOlderThan(t3).retainLast(4).commit(); + removeSnapshots(tx.table()).expireOlderThan(t3).retainLast(4).commit(); tx.commitTransaction(); Assert.assertEquals( @@ -555,11 +563,7 @@ public void testRetainLastKeepsExpiringSnapshot() { } // Retain last 2 snapshots and expire older than t3 - table - .expireSnapshots() - .expireOlderThan(secondSnapshot.timestampMillis()) - .retainLast(2) - .commit(); + removeSnapshots(table).expireOlderThan(secondSnapshot.timestampMillis()).retainLast(2).commit(); Assert.assertEquals( "Should have three snapshots.", 3, Lists.newArrayList(table.snapshots()).size()); @@ -602,8 +606,7 @@ public void testExpireOlderThanMultipleCalls() { } // Retain last 2 snapshots and expire older than t3 - table - .expireSnapshots() + removeSnapshots(table) .expireOlderThan(secondSnapshot.timestampMillis()) .expireOlderThan(thirdSnapshot.timestampMillis()) .commit(); @@ -648,7 +651,7 @@ public void testRetainLastMultipleCalls() { } // Retain last 2 snapshots and expire older than t3 - table.expireSnapshots().expireOlderThan(t3).retainLast(2).retainLast(1).commit(); + removeSnapshots(table).expireOlderThan(t3).retainLast(2).retainLast(1).commit(); Assert.assertEquals( "Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); @@ -662,7 +665,7 @@ public void testRetainZeroSnapshots() { "Should fail retain 0 snapshots " + "because number of snapshots to retain cannot be zero", IllegalArgumentException.class, "Number of snapshots to retain must be at least 1, cannot be: 0", - () -> table.expireSnapshots().retainLast(0).commit()); + () -> removeSnapshots(table).retainLast(0).commit()); } @Test @@ -680,7 +683,7 @@ public void testScanExpiredManifestInValidSnapshotAppend() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); } @@ -706,7 +709,7 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); } @@ -743,7 +746,7 @@ public void dataFilesCleanup() throws IOException { Set deletedFiles = Sets.newHashSet(); - table.expireSnapshots().expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); @@ -784,8 +787,7 @@ public void dataFilesCleanupWithParallelTasks() throws IOException { AtomicInteger deleteThreadsIndex = new AtomicInteger(0); AtomicInteger planThreadsIndex = new AtomicInteger(0); - table - .expireSnapshots() + removeSnapshots(table) .executeDeleteWith( Executors.newFixedThreadPool( 4, @@ -843,8 +845,7 @@ public void noDataFileCleanup() throws IOException { Set deletedFiles = Sets.newHashSet(); - table - .expireSnapshots() + removeSnapshots(table) .cleanExpiredFiles(false) .expireOlderThan(t4) .deleteWith(deletedFiles::add) @@ -875,8 +876,7 @@ public void testWithExpiringDanglingStageCommit() { Set deletedFiles = Sets.newHashSet(); // Expire all commits including dangling staged snapshot. - table - .expireSnapshots() + removeSnapshots(table) .deleteWith(deletedFiles::add) .expireOlderThan(snapshotB.timestampMillis() + 1) .commit(); @@ -944,8 +944,7 @@ public void testWithCherryPickTableSnapshot() { List deletedFiles = Lists.newArrayList(); // Expire `C` - table - .expireSnapshots() + removeSnapshots(table) .deleteWith(deletedFiles::add) .expireOlderThan(snapshotC.timestampMillis() + 1) .commit(); @@ -990,8 +989,7 @@ public void testWithExpiringStagedThenCherrypick() { List deletedFiles = Lists.newArrayList(); // Expire `B` commit. - table - .expireSnapshots() + removeSnapshots(table) .deleteWith(deletedFiles::add) .expireSnapshotId(snapshotB.snapshotId()) .commit(); @@ -1008,8 +1006,7 @@ public void testWithExpiringStagedThenCherrypick() { }); // Expire all snapshots including cherry-pick - table - .expireSnapshots() + removeSnapshots(table) .deleteWith(deletedFiles::add) .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) .commit(); @@ -1055,8 +1052,7 @@ public void testExpireWithDefaultRetainLast() { Snapshot snapshotBeforeExpiration = table.currentSnapshot(); - table - .expireSnapshots() + removeSnapshots(table) .expireOlderThan(System.currentTimeMillis()) .deleteWith(deletedFiles::add) .commit(); @@ -1091,7 +1087,7 @@ public void testExpireWithDefaultSnapshotAge() { Set deletedFiles = Sets.newHashSet(); // rely solely on default configs - table.expireSnapshots().deleteWith(deletedFiles::add).commit(); + removeSnapshots(table).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Should not change current snapshot", thirdSnapshot, table.currentSnapshot()); @@ -1144,11 +1140,7 @@ public void testExpireWithDeleteFiles() { long fourthSnapshotTs = waitUntilAfter(fourthSnapshot.timestampMillis()); Set deletedFiles = Sets.newHashSet(); - table - .expireSnapshots() - .expireOlderThan(fourthSnapshotTs) - .deleteWith(deletedFiles::add) - .commit(); + removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit(); Assert.assertEquals( "Should remove old delete files and delete file manifests", @@ -1190,7 +1182,7 @@ public void testTagExpiration() { waitUntilAfter(expirationTime); - table.expireSnapshots().cleanExpiredFiles(false).commit(); + removeSnapshots(table).cleanExpiredFiles(false).commit(); Assert.assertNull(table.ops().current().ref("tag")); Assert.assertNotNull(table.ops().current().ref("branch")); @@ -1217,7 +1209,7 @@ public void testBranchExpiration() { waitUntilAfter(expirationTime); - table.expireSnapshots().cleanExpiredFiles(false).commit(); + removeSnapshots(table).cleanExpiredFiles(false).commit(); Assert.assertNull(table.ops().current().ref("branch")); Assert.assertNotNull(table.ops().current().ref("tag")); @@ -1225,16 +1217,23 @@ public void testBranchExpiration() { } @Test - public void testMultipleRefsAndCleanExpiredFilesFails() { + public void testMultipleRefsAndCleanExpiredFilesFailsForIncrementalCleanup() { table.newAppend().appendFile(FILE_A).commit(); - + table.newDelete().deleteFile(FILE_A).commit(); table.manageSnapshots().createTag("TagA", table.currentSnapshot().snapshotId()).commit(); + waitUntilAfter(table.currentSnapshot().timestampMillis()); + RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots(); AssertHelpers.assertThrows( "Should fail removing snapshots and files when there is more than 1 ref", UnsupportedOperationException.class, "Cannot incrementally clean files for tables with more than 1 ref", - () -> table.expireSnapshots().cleanExpiredFiles(true).commit()); + () -> + removeSnapshots + .withIncrementalCleanup(true) + .expireOlderThan(table.currentSnapshot().timestampMillis()) + .cleanExpiredFiles(true) + .commit()); } @Test @@ -1253,7 +1252,7 @@ public void testFailRemovingSnapshotWhenStillReferencedByBranch() { "Should fail removing snapshot when it is still referenced", IllegalArgumentException.class, "Cannot expire 2. Still referenced by refs: [branch]", - () -> table.expireSnapshots().expireSnapshotId(snapshotId).commit()); + () -> removeSnapshots(table).expireSnapshotId(snapshotId).commit()); } @Test @@ -1271,7 +1270,7 @@ public void testFailRemovingSnapshotWhenStillReferencedByTag() { "Should fail removing snapshot when it is still referenced", IllegalArgumentException.class, "Cannot expire 1. Still referenced by refs: [tag]", - () -> table.expireSnapshots().expireSnapshotId(snapshotId).commit()); + () -> removeSnapshots(table).expireSnapshotId(snapshotId).commit()); } @Test @@ -1285,7 +1284,7 @@ public void testRetainUnreferencedSnapshotsWithinExpirationAge() { table.newAppend().appendFile(FILE_C).commit(); - table.expireSnapshots().expireOlderThan(expireTimestampSnapshotA).commit(); + removeSnapshots(table).expireOlderThan(expireTimestampSnapshotA).commit(); Assert.assertEquals(2, table.ops().current().snapshots().size()); } @@ -1314,8 +1313,7 @@ public void testUnreferencedSnapshotParentOfTag() { .replaceBranch("main", initialSnapshotId) .commit(); - table - .expireSnapshots() + removeSnapshots(table) .expireOlderThan(expireTimestampSnapshotB) .cleanExpiredFiles(false) .commit(); @@ -1352,8 +1350,7 @@ public void testSnapshotParentOfBranchNotUnreferenced() { .replaceBranch("main", initialSnapshotId) .commit(); - table - .expireSnapshots() + removeSnapshots(table) .expireOlderThan(expireTimestampSnapshotB) .cleanExpiredFiles(false) .commit(); @@ -1362,8 +1359,6 @@ public void testSnapshotParentOfBranchNotUnreferenced() { Assert.assertEquals(3, table.ops().current().snapshots().size()); } - // ToDo: Add tests which commit to branches once committing snapshots to a branch is supported - @Test public void testMinSnapshotsToKeepMultipleBranches() { table.newAppend().appendFile(FILE_A).commit(); @@ -1404,7 +1399,7 @@ public void testMinSnapshotsToKeepMultipleBranches() { // stop retaining snapshots from the branch table.manageSnapshots().setMinSnapshotsToKeep("branch", 1).commit(); - table.expireSnapshots().cleanExpiredFiles(false).commit(); + removeSnapshots(table).cleanExpiredFiles(false).commit(); Assert.assertEquals( "Should have 2 snapshots (initial removed)", 2, Iterables.size(table.snapshots())); @@ -1445,7 +1440,7 @@ public void testMaxSnapshotAgeMultipleBranches() { .setMaxSnapshotAgeMs("branch", Long.MAX_VALUE) .commit(); - table.expireSnapshots().cleanExpiredFiles(false).commit(); + removeSnapshots(table).cleanExpiredFiles(false).commit(); Assert.assertEquals( "Should have 3 snapshots (none removed)", 3, Iterables.size(table.snapshots())); @@ -1459,4 +1454,9 @@ public void testMaxSnapshotAgeMultipleBranches() { "Should have 2 snapshots (initial removed)", 2, Iterables.size(table.snapshots())); Assert.assertNull(table.ops().current().snapshot(initialSnapshotId)); } + + private RemoveSnapshots removeSnapshots(Table table) { + RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots(); + return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup); + } }