diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 430813294bf2..a683e666fb2f 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -555,6 +555,10 @@ public static Builder range(int upTo) { return new Builder<>(new Range(upTo)); } + public static Builder foreach(Iterator items) { + return new Builder<>(() -> items); + } + public static Builder foreach(Iterable items) { return new Builder<>(items); } diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java b/spark/src/main/java/org/apache/iceberg/actions/Actions.java index 1728052e66a3..01a2b84ffa74 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java +++ b/spark/src/main/java/org/apache/iceberg/actions/Actions.java @@ -51,4 +51,8 @@ public RewriteManifestsAction rewriteManifests() { public RewriteDataFilesAction rewriteDataFiles() { return new RewriteDataFilesAction(spark, table); } + + public ExpireSnapshotsAction expireSnapshots() { + return new ExpireSnapshotsAction(spark, table); + } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java index f27015d40dc2..570fb5955986 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java @@ -19,8 +19,17 @@ package org.apache.iceberg.actions; +import java.util.List; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; abstract class BaseAction implements Action { @@ -41,4 +50,66 @@ protected String metadataTableName(MetadataTableType type) { return tableName + "." + type; } } + + /** + * Returns all the path locations of all Manifest Lists for a given table + * @param table the table + * @return the paths of the Manifest Lists + */ + protected List getManifestListPaths(Table table) { + List manifestLists = Lists.newArrayList(); + for (Snapshot snapshot : table.snapshots()) { + String manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null) { + manifestLists.add(manifestListLocation); + } + } + return manifestLists; + } + + /** + * Returns all Metadata file paths which may not be in the current metadata. Specifically + * this includes "version-hint" files as well as entries in metadata.previousFiles. + * @param ops TableOperations for the table we will be getting paths from + * @return a list of paths to metadata files + */ + protected List getOtherMetadataFilePaths(TableOperations ops) { + List otherMetadataFiles = Lists.newArrayList(); + otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); + + TableMetadata metadata = ops.current(); + otherMetadataFiles.add(metadata.metadataFileLocation()); + for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { + otherMetadataFiles.add(previousMetadataFile.file()); + } + return otherMetadataFiles; + } + + protected Dataset buildValidDataFileDF(SparkSession spark) { + String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); + return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path"); + } + + protected Dataset buildManifestFileDF(SparkSession spark) { + String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); + return spark.read().format("iceberg").load(allManifestsMetadataTable).selectExpr("path as file_path"); + } + + protected Dataset buildManifestListDF(SparkSession spark, Table table) { + List manifestLists = getManifestListPaths(table); + return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path"); + } + + protected Dataset buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) { + List otherMetadataFiles = getOtherMetadataFilePaths(ops); + return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); + } + + protected Dataset buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) { + Dataset manifestDF = buildManifestFileDF(spark); + Dataset manifestListDF = buildManifestListDF(spark, table); + Dataset otherMetadataFileDF = buildOtherMetadataFileDF(spark, ops); + + return manifestDF.union(otherMetadataFileDF).union(manifestListDF); + } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java new file mode 100644 index 000000000000..79fc00ad2486 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An action which performs the same operation as {@link ExpireSnapshots} but uses Spark + * to determine the delta in files between the pre and post-expiration table metadata. All of the same + * restrictions of Remove Snapshots also apply to this action. + *

+ * This implementation uses the metadata tables for the table being expired to list all Manifest and DataFiles. This + * is made into a Dataframe which are anti-joined with the same list read after the expiration. This operation will + * require a shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done + * locally using a direct call to RemoveSnapshots. The snapshot expiration will be fully committed before any deletes + * are issued. Deletes are still performed locally after retrieving the results from the Spark executors. + */ +public class ExpireSnapshotsAction extends BaseAction { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); + + private static final String DATA_FILE = "Data File"; + private static final String MANIFEST = "Manifest"; + private static final String MANIFEST_LIST = "Manifest List"; + + // Creates an executor service that runs each task in the thread that invokes execute/submit. + private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null; + + private final SparkSession spark; + private final Table table; + private final TableOperations ops; + private final Consumer defaultDelete = new Consumer() { + @Override + public void accept(String file) { + ops.io().deleteFile(file); + } + }; + + private Set expireSnapshotIdValues = Sets.newHashSet(); + private Long expireOlderThanValue = null; + private Integer retainLastValue = null; + private Consumer deleteFunc = defaultDelete; + private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; + + ExpireSnapshotsAction(SparkSession spark, Table table) { + this.spark = spark; + this.table = table; + this.ops = ((HasTableOperations) table).operations(); + } + + @Override + protected Table table() { + return table; + } + + /** + * An executor service used when deleting files. Only used during the local delete phase of this Spark action. + * Similar to {@link ExpireSnapshots#executeWith(ExecutorService)} + * @param executorService the service to use + * @return this for method chaining + */ + public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) { + this.deleteExecutorService = executorService; + return this; + } + + /** + * A specific snapshot to expire. + * Identical to {@link ExpireSnapshots#expireSnapshotId(long)} + * @param expireSnapshotId Id of the snapshot to expire + * @return this for method chaining + */ + public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { + this.expireSnapshotIdValues.add(expireSnapshotId); + return this; + } + + /** + * Expire all snapshots older than a given timestamp. + * Identical to {@link ExpireSnapshots#expireOlderThan(long)} + * @param timestampMillis all snapshots before this time will be expired + * @return this for method chaining + */ + public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { + this.expireOlderThanValue = timestampMillis; + return this; + } + + /** + * Retain at least x snapshots when expiring + * Identical to {@link ExpireSnapshots#retainLast(int)} + * @param numSnapshots number of snapshots to leave + * @return this for method chaining + */ + public ExpireSnapshotsAction retainLast(int numSnapshots) { + Preconditions.checkArgument(1 <= numSnapshots, + "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots); + this.retainLastValue = numSnapshots; + return this; + } + + /** + * The Consumer used on files which have been determined to be expired. By default uses a filesystem delete. + * Identical to {@link ExpireSnapshots#deleteWith(Consumer)} + * @param newDeleteFunc Consumer which takes a path and deletes it + * @return this for method chaining + */ + public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { + this.deleteFunc = newDeleteFunc; + return this; + } + + @Override + public ExpireSnapshotsActionResult execute() { + Dataset originalFiles = null; + try { + // Metadata before Expiration + originalFiles = buildValidFileDF().persist(); + // Action to trigger persist + originalFiles.count(); + + // Perform Expiration + ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); + for (final Long id : expireSnapshotIdValues) { + expireSnaps = expireSnaps.expireSnapshotId(id); + } + + if (expireOlderThanValue != null) { + expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); + } + + if (retainLastValue != null) { + expireSnaps = expireSnaps.retainLast(retainLastValue); + } + + expireSnaps.commit(); + + // Metadata after Expiration + Dataset validFiles = buildValidFileDF(); + Dataset filesToDelete = originalFiles.except(validFiles); + + return deleteFiles(filesToDelete.toLocalIterator()); + } finally { + if (originalFiles != null) { + originalFiles.unpersist(); + } + } + } + + private Dataset appendTypeString(Dataset ds, String type) { + return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); + } + + private Dataset buildValidFileDF() { + return appendTypeString(buildValidDataFileDF(spark), DATA_FILE) + .union(appendTypeString(buildManifestFileDF(spark), MANIFEST)) + .union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST)); + } + + /** + * Deletes files passed to it based on their type. + * @param expiredFiles an Iterator of Spark Rows of the structure (path: String, type: String) + * @return Statistics on which files were deleted + */ + private ExpireSnapshotsActionResult deleteFiles(Iterator expiredFiles) { + AtomicLong dataFileCount = new AtomicLong(0L); + AtomicLong manifestCount = new AtomicLong(0L); + AtomicLong manifestListCount = new AtomicLong(0L); + + Tasks.foreach(expiredFiles) + .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished() + .executeWith(deleteExecutorService) + .onFailure((fileInfo, exc) -> + LOG.warn("Delete failed for {}: {}", fileInfo.getString(1), fileInfo.getString(0), exc)) + .run(fileInfo -> { + String file = fileInfo.getString(0); + String type = fileInfo.getString(1); + deleteFunc.accept(file); + switch (type) { + case DATA_FILE: + dataFileCount.incrementAndGet(); + LOG.trace("Deleted Data File: {}", file); + break; + case MANIFEST: + manifestCount.incrementAndGet(); + LOG.debug("Deleted Manifest: {}", file); + break; + case MANIFEST_LIST: + manifestListCount.incrementAndGet(); + LOG.debug("Deleted Manifest List: {}", file); + break; + } + }); + LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); + return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java new file mode 100644 index 000000000000..adb74bcdfc96 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class ExpireSnapshotsActionResult { + + private final Long dataFilesDeleted; + private final Long manifestFilesDeleted; + private final Long manifestListsDeleted; + + public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) { + this.dataFilesDeleted = dataFilesDeleted; + this.manifestFilesDeleted = manifestFilesDeleted; + this.manifestListsDeleted = manifestListsDeleted; + } + + public Long dataFilesDeleted() { + return dataFilesDeleted; + } + + public Long manifestFilesDeleted() { + return manifestFilesDeleted; + } + + public Long manifestListsDeleted() { + return manifestListsDeleted; + } + +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java index d918d5dc156f..13684fa4b7b0 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java @@ -31,10 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HiddenPathFilter; @@ -148,8 +145,8 @@ public RemoveOrphanFilesAction deleteWith(Consumer newDeleteFunc) { @Override public List execute() { - Dataset validDataFileDF = buildValidDataFileDF(); - Dataset validMetadataFileDF = buildValidMetadataFileDF(); + Dataset validDataFileDF = buildValidDataFileDF(spark); + Dataset validMetadataFileDF = buildValidMetadataFileDF(spark, table, ops); Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); Dataset actualFileDF = buildActualFileDF(); @@ -170,43 +167,6 @@ public List execute() { return orphanFiles; } - private Dataset buildValidDataFileDF() { - String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); - return spark.read().format("iceberg") - .load(allDataFilesMetadataTable) - .select("file_path"); - } - - private Dataset buildValidMetadataFileDF() { - String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); - Dataset manifestDF = spark.read().format("iceberg") - .load(allManifestsMetadataTable) - .selectExpr("path as file_path"); - - List otherMetadataFiles = Lists.newArrayList(); - - for (Snapshot snapshot : table.snapshots()) { - String manifestListLocation = snapshot.manifestListLocation(); - if (manifestListLocation != null) { - otherMetadataFiles.add(manifestListLocation); - } - } - - otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); - - TableMetadata metadata = ops.current(); - otherMetadataFiles.add(metadata.metadataFileLocation()); - for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { - otherMetadataFiles.add(previousMetadataFile.file()); - } - - Dataset otherMetadataFileDF = spark - .createDataset(otherMetadataFiles, Encoders.STRING()) - .toDF("file_path"); - - return manifestDF.union(otherMetadataFileDF); - } - private Dataset buildActualFileDF() { List subDirs = Lists.newArrayList(); List matchingFiles = Lists.newArrayList(); diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java new file mode 100644 index 000000000000..7a44ae1f9d12 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -0,0 +1,700 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public abstract class TestExpireSnapshotsAction extends SparkTestBase { + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + + static final DataFile FILE_A = DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_B = DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=1") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_C = DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=2") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_D = DataFiles.builder(SPEC) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=3") // easy way to set partition data for now + .withRecordCount(1) + .build(); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private File tableDir; + private String tableLocation; + private Table table; + + @Before + public void setupTableLocation() throws Exception { + this.tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + } + + private Long rightAfterSnapshot() { + Long end = System.currentTimeMillis(); + while (end <= table.currentSnapshot().timestampMillis()) { + end = System.currentTimeMillis(); + } + return end; + } + + private void checkExpirationResults(Long expectedDatafiles, Long expectedManifestsDeleted, + Long expectedManifestListsDeleted, ExpireSnapshotsActionResult results) { + + Assert.assertEquals("Incorrect number of manifest files deleted", + expectedManifestsDeleted, results.manifestFilesDeleted()); + Assert.assertEquals("Incorrect number of datafiles deleted", + expectedDatafiles, results.dataFilesDeleted()); + Assert.assertEquals("Incorrect number of manifest lists deleted", + expectedManifestListsDeleted, results.manifestListsDeleted()); + } + + @Test + public void testFilesCleaned() throws Exception { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + table.newOverwrite() + .deleteFile(FILE_A) + .addFile(FILE_B) + .commit(); + + table.newFastAppend() + .appendFile(FILE_C) + .commit(); + + long end = rightAfterSnapshot(); + + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); + + Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots())); + + checkExpirationResults(1L, 1L, 2L, results); + } + + @Test + public void dataFilesCleanupWithParallelTasks() throws IOException { + + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + table.newFastAppend() + .appendFile(FILE_B) + .commit(); + + table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)) + .commit(); + + table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)) + .commit(); + + long t4 = rightAfterSnapshot(); + + Set deletedFiles = Sets.newHashSet(); + Set deleteThreads = ConcurrentHashMap.newKeySet(); + AtomicInteger deleteThreadsIndex = new AtomicInteger(0); + + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> { + Thread thread = new Thread(runnable); + thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement()); + thread.setDaemon(true); // daemon threads will be terminated abruptly when the JVM exits + return thread; + })) + .expireOlderThan(t4) + .deleteWith(s -> { + deleteThreads.add(Thread.currentThread().getName()); + deletedFiles.add(s); + }) + .execute(); + + // Verifies that the delete methods ran in the threads created by the provided ExecutorService ThreadFactory + Assert.assertEquals(deleteThreads, + Sets.newHashSet("remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); + + checkExpirationResults(2L, 3L, 3L, result); + } + + @Test + public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().execute(); + checkExpirationResults(0L, 0L, 0L, results); + } + + @Test + public void testCleanupRepeatedOverwrites() throws Exception { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + for (int i = 0; i < 10; i++) { + table.newOverwrite() + .deleteFile(FILE_A) + .addFile(FILE_B) + .commit(); + + table.newOverwrite() + .deleteFile(FILE_B) + .addFile(FILE_A) + .commit(); + } + + long end = rightAfterSnapshot(); + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); + checkExpirationResults(1L, 39L, 20L, results); + } + + @Test + public void testRetainLastWithExpireOlderThan() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t3 = rightAfterSnapshot(); + + // Retain last 2 snapshots + Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .retainLast(2) + .execute(); + + Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); + } + + @Test + public void testExpireTwoSnapshotsById() throws Exception { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + long secondSnapshotID = table.currentSnapshot().snapshotId(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + // Retain last 2 snapshots + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireSnapshotId(firstSnapshotId) + .expireSnapshotId(secondSnapshotID) + .execute(); + + Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); + Assert.assertEquals("Second snapshot should not be present.", null, table.snapshot(secondSnapshotID)); + + checkExpirationResults(0L, 0L, 2L, result); + } + + @Test + public void testRetainLastWithExpireById() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + // Retain last 3 snapshots, but explicitly remove the first snapshot + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireSnapshotId(firstSnapshotId) + .retainLast(3) + .execute(); + + Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); + checkExpirationResults(0L, 0L, 1L, result); + } + + @Test + public void testRetainLastWithTooFewSnapshots() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .appendFile(FILE_B) // data_bucket=1 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t2 = rightAfterSnapshot(); + + // Retain last 3 snapshots + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireOlderThan(t2) + .retainLast(3) + .execute(); + + Assert.assertEquals("Should have two snapshots", 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should still present", + firstSnapshotId, table.snapshot(firstSnapshotId).snapshotId()); + checkExpirationResults(0L, 0L, 0L, result); + } + + @Test + public void testRetainLastKeepsExpiringSnapshot() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + table.newAppend() + .appendFile(FILE_D) // data_bucket=3 + .commit(); + + // Retain last 2 snapshots and expire older than t3 + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireOlderThan(secondSnapshot.timestampMillis()) + .retainLast(2) + .execute(); + + Assert.assertEquals("Should have three snapshots.", 3, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNotNull("Second snapshot should present.", table.snapshot(secondSnapshot.snapshotId())); + checkExpirationResults(0L, 0L, 1L, result); + } + + @Test + public void testExpireOlderThanMultipleCalls() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + Snapshot thirdSnapshot = table.currentSnapshot(); + + // Retain last 2 snapshots and expire older than t3 + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireOlderThan(secondSnapshot.timestampMillis()) + .expireOlderThan(thirdSnapshot.timestampMillis()) + .execute(); + + Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); + checkExpirationResults(0L, 0L, 2L, result); + } + + @Test + public void testRetainLastMultipleCalls() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t3 = rightAfterSnapshot(); + + // Retain last 2 snapshots and expire older than t3 + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .retainLast(2) + .retainLast(1) + .execute(); + + Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); + checkExpirationResults(0L, 0L, 2L, result); + } + + @Test + public void testRetainZeroSnapshots() { + AssertHelpers.assertThrows("Should fail retain 0 snapshots " + + "because number of snapshots to retain cannot be zero", + IllegalArgumentException.class, + "Number of snapshots to retain must be at least 1, cannot be: 0", + () -> Actions.forTable(table).expireSnapshots().retainLast(0).execute()); + } + + @Test + public void testScanExpiredManifestInValidSnapshotAppend() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.newOverwrite() + .addFile(FILE_C) + .deleteFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_D) + .commit(); + + long t3 = rightAfterSnapshot(); + + Set deletedFiles = Sets.newHashSet(); + + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .deleteWith(deletedFiles::add) + .execute(); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + checkExpirationResults(1L, 1L, 2L, result); + } + + @Test + public void testScanExpiredManifestInValidSnapshotFastAppend() { + table.updateProperties() + .set(TableProperties.MANIFEST_MERGE_ENABLED, "true") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.newOverwrite() + .addFile(FILE_C) + .deleteFile(FILE_A) + .commit(); + + table.newFastAppend() + .appendFile(FILE_D) + .commit(); + + long t3 = rightAfterSnapshot(); + + Set deletedFiles = Sets.newHashSet(); + + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .deleteWith(deletedFiles::add) + .execute(); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + checkExpirationResults(1L, 1L, 2L, result); + } + + /** + * Test on table below, and expiring the staged commit `B` using `expireOlderThan` API. + * Table: A - C + * ` B (staged) + */ + @Test + public void testWithExpiringDanglingStageCommit() { + // `A` commit + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // `B` staged commit + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + TableMetadata base = ((BaseTable) table).operations().current(); + Snapshot snapshotA = base.snapshots().get(0); + Snapshot snapshotB = base.snapshots().get(1); + + // `C` commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Set deletedFiles = new HashSet<>(); + + // Expire all commits including dangling staged snapshot. + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireOlderThan(snapshotB.timestampMillis() + 1) + .execute(); + + checkExpirationResults(1L, 1L, 2L, result); + + Set expectedDeletes = new HashSet<>(); + expectedDeletes.add(snapshotA.manifestListLocation()); + + // Files should be deleted of dangling staged snapshot + snapshotB.addedFiles().forEach(i -> { + expectedDeletes.add(i.path().toString()); + }); + + // ManifestList should be deleted too + expectedDeletes.add(snapshotB.manifestListLocation()); + snapshotB.dataManifests().forEach(file -> { + //Only the manifest of B should be deleted. + if (file.snapshotId() == snapshotB.snapshotId()) { + expectedDeletes.add(file.path()); + } + }); + Assert.assertSame("Files deleted count should be expected", expectedDeletes.size(), deletedFiles.size()); + //Take the diff + expectedDeletes.removeAll(deletedFiles); + Assert.assertTrue("Exactly same files should be deleted", expectedDeletes.isEmpty()); + } + + /** + * Expire cherry-pick the commit as shown below, when `B` is in table's current state + * Table: + * A - B - C <--current snapshot + * `- D (source=B) + */ + @Test + public void testWithCherryPickTableSnapshot() { + // `A` commit + table.newAppend() + .appendFile(FILE_A) + .commit(); + Snapshot snapshotA = table.currentSnapshot(); + + // `B` commit + Set deletedAFiles = new HashSet<>(); + table.newOverwrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .deleteWith(deletedAFiles::add) + .commit(); + Assert.assertTrue("No files should be physically deleted", deletedAFiles.isEmpty()); + + // pick the snapshot 'B` + Snapshot snapshotB = table.currentSnapshot(); + + // `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick + table.newAppend() + .appendFile(FILE_C) + .commit(); + Snapshot snapshotC = table.currentSnapshot(); + + // Move the table back to `A` + table.manageSnapshots() + .setCurrentSnapshot(snapshotA.snapshotId()) + .commit(); + + // Generate A -> `D (B)` + table.manageSnapshots() + .cherrypick(snapshotB.snapshotId()) + .commit(); + Snapshot snapshotD = table.currentSnapshot(); + + // Move the table back to `C` + table.manageSnapshots() + .setCurrentSnapshot(snapshotC.snapshotId()) + .commit(); + List deletedFiles = new ArrayList<>(); + + // Expire `C` + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireOlderThan(snapshotC.timestampMillis() + 1) + .execute(); + + // Make sure no dataFiles are deleted for the B, C, D snapshot + Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> { + i.addedFiles().forEach(item -> { + Assert.assertFalse(deletedFiles.contains(item.path().toString())); + }); + }); + + checkExpirationResults(1L, 2L, 2L, result); + } + + /** + * Test on table below, and expiring `B` which is not in current table state. + * 1) Expire `B` + * 2) All commit + * Table: A - C - D (B) + * ` B (staged) + */ + @Test + public void testWithExpiringStagedThenCherrypick() { + // `A` commit + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // `B` commit + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + // pick the snapshot that's staged but not committed + TableMetadata base = ((BaseTable) table).operations().current(); + Snapshot snapshotB = base.snapshots().get(1); + + // `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick + table.newAppend() + .appendFile(FILE_C) + .commit(); + + // `D (B)` cherry-pick commit + table.manageSnapshots() + .cherrypick(snapshotB.snapshotId()) + .commit(); + + base = ((BaseTable) table).operations().current(); + Snapshot snapshotD = base.snapshots().get(3); + + List deletedFiles = new ArrayList<>(); + + // Expire `B` commit. + ExpireSnapshotsActionResult firstResult = Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireSnapshotId(snapshotB.snapshotId()) + .execute(); + + // Make sure no dataFiles are deleted for the staged snapshot + Lists.newArrayList(snapshotB).forEach(i -> { + i.addedFiles().forEach(item -> { + Assert.assertFalse(deletedFiles.contains(item.path().toString())); + }); + }); + checkExpirationResults(0L, 1L, 1L, firstResult); + + // Expire all snapshots including cherry-pick + ExpireSnapshotsActionResult secondResult = Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) + .execute(); + + // Make sure no dataFiles are deleted for the staged and cherry-pick + Lists.newArrayList(snapshotB, snapshotD).forEach(i -> { + i.addedFiles().forEach(item -> { + Assert.assertFalse(deletedFiles.contains(item.path().toString())); + }); + }); + checkExpirationResults(0L, 0L, 2L, secondResult); + } +} + diff --git a/spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java b/spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java new file mode 100644 index 000000000000..d613e09d7175 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class TestExpireSnapshotsAction24 extends TestExpireSnapshotsAction{ +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java b/spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java new file mode 100644 index 000000000000..a05ed3f5664e --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import org.apache.iceberg.actions.TestExpireSnapshotsAction; + +public class TestExpireSnapshotsAction3 extends TestExpireSnapshotsAction { +}