From 896c6cf4ad34fa273287d093cebe189fdeb4c27b Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Wed, 5 Aug 2020 11:29:49 -0500 Subject: [PATCH 1/6] Expire Snapshots Spark Action Previously ther only way to expire snapshots was through a single machine table operation with RemoveSnapshots. In this patch we add a new Spark Action which does the same work, but does so in a scalable way. Instead of using the old logic for analyzing files to remove, we use the Metadata Table representations of the table both before and after Snapshot Expiration to determine un-needed files. --- .../org/apache/iceberg/actions/Actions.java | 4 + .../apache/iceberg/actions/BaseAction.java | 65 ++++++ .../actions/ExpireSnapshotActionResult.java | 52 +++++ .../actions/ExpireSnapshotsAction.java | 156 +++++++++++++ .../actions/RemoveOrphanFilesAction.java | 44 +--- .../actions/TestExpireSnapshotsAction.java | 210 ++++++++++++++++++ .../actions/TestExpireSnapshotsAction24.java | 23 ++ .../actions/TestExpireSnapshotsAction3.java | 25 +++ 8 files changed, 537 insertions(+), 42 deletions(-) create mode 100644 spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java create mode 100644 spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java create mode 100644 spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java create mode 100644 spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java b/spark/src/main/java/org/apache/iceberg/actions/Actions.java index 1728052e66a3..01a2b84ffa74 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java +++ b/spark/src/main/java/org/apache/iceberg/actions/Actions.java @@ -51,4 +51,8 @@ public RewriteManifestsAction rewriteManifests() { public RewriteDataFilesAction rewriteDataFiles() { return new RewriteDataFilesAction(spark, table); } + + public ExpireSnapshotsAction expireSnapshots() { + return new ExpireSnapshotsAction(spark, table); + } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java index f27015d40dc2..1059f40f4060 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java @@ -19,8 +19,18 @@ package org.apache.iceberg.actions; +import java.util.List; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + abstract class BaseAction implements Action { @@ -41,4 +51,59 @@ protected String metadataTableName(MetadataTableType type) { return tableName + "." + type; } } + + protected Dataset buildValidDataFileDF(SparkSession spark) { + String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); + return spark.read().format("iceberg") + .load(allDataFilesMetadataTable) + .select("file_path"); + } + + protected Dataset buildManifestFileDF(SparkSession spark) { + String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); + Dataset manifestDF = spark.read().format("iceberg") + .load(allManifestsMetadataTable) + .selectExpr("path as file_path"); + return manifestDF; + } + + protected Dataset buildManifestListDF(SparkSession spark, Table table) { + List manifestLists = Lists.newArrayList(); + + for (Snapshot snapshot : table.snapshots()) { + String manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null) { + manifestLists.add(manifestListLocation); + } + } + + Dataset manifestListDF = spark + .createDataset(manifestLists, Encoders.STRING()) + .toDF("file_path"); + + return manifestListDF; + } + + protected Dataset buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) { + List otherMetadataFiles = Lists.newArrayList(); + otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); + TableMetadata metadata = ops.current(); + otherMetadataFiles.add(metadata.metadataFileLocation()); + for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { + otherMetadataFiles.add(previousMetadataFile.file()); + } + + Dataset otherMetadataFileDF = spark + .createDataset(otherMetadataFiles, Encoders.STRING()) + .toDF("file_path"); + return otherMetadataFileDF; + } + + protected Dataset buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) { + Dataset manifestDF = buildManifestFileDF(spark); + Dataset manifestListDF = buildManifestListDF(spark, table); + Dataset otherMetadataFileDF = buildOtherMetadataFileDF(spark, ops); + + return manifestDF.union(otherMetadataFileDF).union(manifestListDF); + } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java new file mode 100644 index 000000000000..e81c384ec2fa --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class ExpireSnapshotActionResult { + + private final Long dataFilesDeleted; + private final Long manifestFilesDeleted; + private final Long manifestListsDeleted; + private final Long otherDeleted; + + public ExpireSnapshotActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted, + Long otherDeleted) { + this.dataFilesDeleted = dataFilesDeleted; + this.manifestFilesDeleted = manifestFilesDeleted; + this.manifestListsDeleted = manifestListsDeleted; + this.otherDeleted = otherDeleted; + } + + public Long getDataFilesDeleted() { + return dataFilesDeleted; + } + + public Long getManifestFilesDeleted() { + return manifestFilesDeleted; + } + + public Long getManifestListsDeleted() { + return manifestListsDeleted; + } + + public Long getOtherDeleted() { + return otherDeleted; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java new file mode 100644 index 000000000000..677df6d5f2b9 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExpireSnapshotsAction extends BaseAction { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); + + private final SparkSession spark; + private final JavaSparkContext sparkContext; + private final Table table; + private final TableOperations ops; + private final ExpireSnapshots localExpireSnapshots; + private final TableMetadata base; + private static final String DATAFILE = "Data File"; + private static final String MANIFEST = "Manifest"; + private static final String MANIFESTLIST = "Manifest List"; + private static final String OTHER = "Other"; + + private final Consumer defaultDelete = new Consumer() { + @Override + public void accept(String file) { + ops.io().deleteFile(file); + } + }; + private Consumer deleteFunc = defaultDelete; + + + ExpireSnapshotsAction(SparkSession spark, Table table) { + this.spark = spark; + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.ops = ((HasTableOperations) table).operations(); + this.base = ops.current(); + this.localExpireSnapshots = table.expireSnapshots().cleanExpiredFiles(false); + } + + public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { + localExpireSnapshots.expireSnapshotId(expireSnapshotId); + return this; + } + + public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { + localExpireSnapshots.expireOlderThan(timestampMillis); + return this; + } + + public ExpireSnapshotsAction retainLast(int numSnapshots) { + localExpireSnapshots.retainLast(numSnapshots); + return this; + } + + public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { + deleteFunc = newDeleteFunc; + return this; + } + + + @Override + protected Table table() { + return table; + } + + private Dataset appendTypeString(Dataset ds, String type) { + return ds.select(new Column("file_path"), functions.lit(type).as("DataFile")); + } + + private Dataset getValidFileDF() { + return appendTypeString(buildValidDataFileDF(spark), DATAFILE) + .union(appendTypeString(buildManifestFileDF(spark), MANIFEST)) + .union(appendTypeString(buildManifestListDF(spark, table), MANIFESTLIST)) + .union(appendTypeString(buildOtherMetadataFileDF(spark, ops), OTHER)); + } + + private Set getFilesOfType(List files, String type) { + return files.stream() + .filter(row -> row.getString(1).equals(type)) + .map(row -> row.getString(0)) + .collect(Collectors.toSet()); + } + + @Override + public ExpireSnapshotActionResult execute() { + + Dataset originalFiles = getValidFileDF().persist(); + originalFiles.count(); // Trigger Persist + + localExpireSnapshots.commit(); + + Dataset validFiles = getValidFileDF(); + + List filesToDelete = originalFiles.except(validFiles).collectAsList(); + + LOG.warn("Deleting {} files", filesToDelete.size()); + return new ExpireSnapshotActionResult( + deleteFiles(getFilesOfType(filesToDelete, DATAFILE), DATAFILE), + deleteFiles(getFilesOfType(filesToDelete, MANIFEST), MANIFEST), + deleteFiles(getFilesOfType(filesToDelete, MANIFESTLIST), MANIFESTLIST), + deleteFiles(getFilesOfType(filesToDelete, OTHER), OTHER)); + } + + private Long deleteFiles(Set paths, String fileType) { + LOG.warn("{}s to delete: {}", fileType, Joiner.on(", ").join(paths)); + AtomicReference deleteCount = new AtomicReference<>(0L); + + Tasks.foreach(paths) + .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .onFailure((manifest, exc) -> LOG.warn("Delete failed for {}: {}", fileType, manifest, exc)) + .run(file -> { + deleteFunc.accept(file); + deleteCount.updateAndGet(v -> v + 1); + }); + return deleteCount.get(); + } + +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java index d918d5dc156f..13684fa4b7b0 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java @@ -31,10 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HiddenPathFilter; @@ -148,8 +145,8 @@ public RemoveOrphanFilesAction deleteWith(Consumer newDeleteFunc) { @Override public List execute() { - Dataset validDataFileDF = buildValidDataFileDF(); - Dataset validMetadataFileDF = buildValidMetadataFileDF(); + Dataset validDataFileDF = buildValidDataFileDF(spark); + Dataset validMetadataFileDF = buildValidMetadataFileDF(spark, table, ops); Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); Dataset actualFileDF = buildActualFileDF(); @@ -170,43 +167,6 @@ public List execute() { return orphanFiles; } - private Dataset buildValidDataFileDF() { - String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); - return spark.read().format("iceberg") - .load(allDataFilesMetadataTable) - .select("file_path"); - } - - private Dataset buildValidMetadataFileDF() { - String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); - Dataset manifestDF = spark.read().format("iceberg") - .load(allManifestsMetadataTable) - .selectExpr("path as file_path"); - - List otherMetadataFiles = Lists.newArrayList(); - - for (Snapshot snapshot : table.snapshots()) { - String manifestListLocation = snapshot.manifestListLocation(); - if (manifestListLocation != null) { - otherMetadataFiles.add(manifestListLocation); - } - } - - otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); - - TableMetadata metadata = ops.current(); - otherMetadataFiles.add(metadata.metadataFileLocation()); - for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { - otherMetadataFiles.add(previousMetadataFile.file()); - } - - Dataset otherMetadataFileDF = spark - .createDataset(otherMetadataFiles, Encoders.STRING()) - .toDF("file_path"); - - return manifestDF.union(otherMetadataFileDF); - } - private Dataset buildActualFileDF() { List subDirs = Lists.newArrayList(); List matchingFiles = Lists.newArrayList(); diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java new file mode 100644 index 000000000000..c61f2748a802 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public abstract class TestExpireSnapshotsAction extends SparkTestBase { + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private File tableDir; + private String tableLocation = null; + + @Before + public void setupTableLocation() throws Exception { + this.tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + private void checkExpirationResults(Long expectedDatafiles, Long expectedManifestsDeleted, + Long expectedManifestListsDeleted, ExpireSnapshotActionResult results, Long expectedOther) { + + Assert.assertEquals("Incorrect number of manifest files deleted", + expectedManifestsDeleted, results.getManifestFilesDeleted()); + Assert.assertEquals("Incorrect number of datafiles deleted", + expectedDatafiles, results.getDataFilesDeleted()); + Assert.assertEquals("Incorrect number of manifest lists deleted", + expectedManifestListsDeleted, results.getManifestListsDeleted()); + Assert.assertEquals("Incorrect number of other files deleted", + expectedOther, results.getOtherDeleted()); + } + + + @Test + public void testFilesCleaned() throws Exception { + Table table = TABLES + .create(SCHEMA, + PartitionSpec.unpartitioned(), + Maps.newHashMap(), + tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + List expiredDataFiles = Files + .list(tableDir.toPath().resolve("data")) + .collect(Collectors.toList()); + + Assert.assertEquals("There should be a data file to delete but there was none.", + 2, expiredDataFiles.size()); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("overwrite") + .save(tableLocation); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + long end = System.currentTimeMillis(); + while (end <= table.currentSnapshot().timestampMillis()) { + end = System.currentTimeMillis(); + } + + ExpireSnapshotActionResult results = + Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); + + table.refresh(); + + Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, + Iterables.size(table.snapshots())); + + for (Path p : expiredDataFiles) { + Assert.assertFalse(String.format("File %s still exists but should have been deleted", p), + Files.exists(p)); + } + + checkExpirationResults(1L, 2L, 2L, results, 0L); + } + + @Test + public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { + Table table = TABLES + .create(SCHEMA, + PartitionSpec.unpartitioned(), + Maps.newHashMap(), + tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + ExpireSnapshotActionResult results = + Actions.forTable(table).expireSnapshots().execute(); + + checkExpirationResults(0L, 0L, 0L, results, 0L); + } + + @Test + public void testCleanupRepeatedOverwrites() throws Exception { + Table table = TABLES + .create(SCHEMA, + PartitionSpec.unpartitioned(), + Maps.newHashMap(), + tableLocation); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + for (int i = 0; i < 10; i++) { + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("overwrite") + .save(tableLocation); + } + + long end = System.currentTimeMillis(); + while (end <= table.currentSnapshot().timestampMillis()) { + end = System.currentTimeMillis(); + } + + ExpireSnapshotActionResult results = + Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); + + table.refresh(); + checkExpirationResults(10L, 19L, 10L, results, 0L); + } + +} + diff --git a/spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java b/spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java new file mode 100644 index 000000000000..d613e09d7175 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction24.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class TestExpireSnapshotsAction24 extends TestExpireSnapshotsAction{ +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java b/spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java new file mode 100644 index 000000000000..a05ed3f5664e --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction3.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import org.apache.iceberg.actions.TestExpireSnapshotsAction; + +public class TestExpireSnapshotsAction3 extends TestExpireSnapshotsAction { +} From 2e0af0a1121d923978344676d593e46d89686962 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Wed, 5 Aug 2020 17:00:28 -0500 Subject: [PATCH 2/6] Reviewer Comments Lazy construction of Expire Snapshots Action. Processing of deletes using Local Iterator Ignoring Versioning Files Adding ExecutorService Option Like RemoveSnapshots --- .../java/org/apache/iceberg/util/Tasks.java | 4 + .../apache/iceberg/actions/BaseAction.java | 22 +-- .../actions/ExpireSnapshotsAction.java | 160 ++++++++++-------- ....java => ExpireSnapshotsActionResult.java} | 10 +- .../actions/TestExpireSnapshotsAction.java | 16 +- 5 files changed, 109 insertions(+), 103 deletions(-) rename spark/src/main/java/org/apache/iceberg/actions/{ExpireSnapshotActionResult.java => ExpireSnapshotsActionResult.java} (81%) diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 430813294bf2..a683e666fb2f 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -555,6 +555,10 @@ public static Builder range(int upTo) { return new Builder<>(new Range(upTo)); } + public static Builder foreach(Iterator items) { + return new Builder<>(() -> items); + } + public static Builder foreach(Iterable items) { return new Builder<>(items); } diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java index 1059f40f4060..3d06317d8aa1 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java @@ -54,22 +54,16 @@ protected String metadataTableName(MetadataTableType type) { protected Dataset buildValidDataFileDF(SparkSession spark) { String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); - return spark.read().format("iceberg") - .load(allDataFilesMetadataTable) - .select("file_path"); + return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path"); } protected Dataset buildManifestFileDF(SparkSession spark) { String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS); - Dataset manifestDF = spark.read().format("iceberg") - .load(allManifestsMetadataTable) - .selectExpr("path as file_path"); - return manifestDF; + return spark.read().format("iceberg").load(allManifestsMetadataTable).selectExpr("path as file_path"); } protected Dataset buildManifestListDF(SparkSession spark, Table table) { List manifestLists = Lists.newArrayList(); - for (Snapshot snapshot : table.snapshots()) { String manifestListLocation = snapshot.manifestListLocation(); if (manifestListLocation != null) { @@ -77,26 +71,20 @@ protected Dataset buildManifestListDF(SparkSession spark, Table table) { } } - Dataset manifestListDF = spark - .createDataset(manifestLists, Encoders.STRING()) - .toDF("file_path"); - - return manifestListDF; + return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path"); } protected Dataset buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); + TableMetadata metadata = ops.current(); otherMetadataFiles.add(metadata.metadataFileLocation()); for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { otherMetadataFiles.add(previousMetadataFile.file()); } - Dataset otherMetadataFileDF = spark - .createDataset(otherMetadataFiles, Encoders.STRING()) - .toDF("file_path"); - return otherMetadataFileDF; + return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } protected Dataset buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) { diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index 677df6d5f2b9..ac8a2c60564e 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -19,21 +19,17 @@ package org.apache.iceberg.actions; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.stream.Collectors; import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -42,115 +38,141 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ExpireSnapshotsAction extends BaseAction { +public class ExpireSnapshotsAction extends BaseAction { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); + private static final String DATA_FILE = "Data File"; + private static final String MANIFEST = "Manifest"; + private static final String MANIFEST_LIST = "Manifest List"; + + // Creates an executor service that runs each task in the thread that invokes execute/submit. + private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = MoreExecutors.newDirectExecutorService(); + private final SparkSession spark; - private final JavaSparkContext sparkContext; private final Table table; private final TableOperations ops; - private final ExpireSnapshots localExpireSnapshots; - private final TableMetadata base; - private static final String DATAFILE = "Data File"; - private static final String MANIFEST = "Manifest"; - private static final String MANIFESTLIST = "Manifest List"; - private static final String OTHER = "Other"; - private final Consumer defaultDelete = new Consumer() { @Override public void accept(String file) { ops.io().deleteFile(file); } }; - private Consumer deleteFunc = defaultDelete; + private Long expireSnapshotIdValue = null; + private Long expireOlderThanValue = null; + private Integer retainLastValue = null; + private Consumer deleteFunc = defaultDelete; + private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; ExpireSnapshotsAction(SparkSession spark, Table table) { this.spark = spark; - this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.ops = ((HasTableOperations) table).operations(); - this.base = ops.current(); - this.localExpireSnapshots = table.expireSnapshots().cleanExpiredFiles(false); + } + + @Override + protected Table table() { + return table; + } + + /** + * An executor service used when deleting files. Only used during the local delete phase of this Spark action + * @param executorService the service to use + * @return this for method chaining + */ + public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) { + this.deleteExecutorService = executorService; + return this; } public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { - localExpireSnapshots.expireSnapshotId(expireSnapshotId); + this.expireSnapshotIdValue = expireSnapshotId; return this; } public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { - localExpireSnapshots.expireOlderThan(timestampMillis); + this.expireOlderThanValue = timestampMillis; return this; } public ExpireSnapshotsAction retainLast(int numSnapshots) { - localExpireSnapshots.retainLast(numSnapshots); + this.retainLastValue = numSnapshots; return this; } public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { - deleteFunc = newDeleteFunc; + this.deleteFunc = newDeleteFunc; return this; } - @Override - protected Table table() { - return table; - } + public ExpireSnapshotsActionResult execute() { + //Metadata before Expiration + Dataset originalFiles = buildValidFileDF().persist(); + originalFiles.count(); // Action to trigger persist + + //Perform Expiration + ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); + if (expireSnapshotIdValue != null) { + expireSnaps = expireSnaps.expireSnapshotId(expireSnapshotIdValue); + } + if (expireOlderThanValue != null) { + expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); + } + if (retainLastValue != null) { + expireSnaps = expireSnaps.retainLast(retainLastValue); + } + expireSnaps.commit(); - private Dataset appendTypeString(Dataset ds, String type) { - return ds.select(new Column("file_path"), functions.lit(type).as("DataFile")); - } + // Metadata after Expiration + Dataset validFiles = buildValidFileDF(); + Dataset filesToDelete = originalFiles.except(validFiles); - private Dataset getValidFileDF() { - return appendTypeString(buildValidDataFileDF(spark), DATAFILE) - .union(appendTypeString(buildManifestFileDF(spark), MANIFEST)) - .union(appendTypeString(buildManifestListDF(spark, table), MANIFESTLIST)) - .union(appendTypeString(buildOtherMetadataFileDF(spark, ops), OTHER)); + ExpireSnapshotsActionResult result = deleteFiles(filesToDelete.toLocalIterator()); + originalFiles.unpersist(); + return result; } - private Set getFilesOfType(List files, String type) { - return files.stream() - .filter(row -> row.getString(1).equals(type)) - .map(row -> row.getString(0)) - .collect(Collectors.toSet()); + private Dataset appendTypeString(Dataset ds, String type) { + return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); } - @Override - public ExpireSnapshotActionResult execute() { - - Dataset originalFiles = getValidFileDF().persist(); - originalFiles.count(); // Trigger Persist - - localExpireSnapshots.commit(); - - Dataset validFiles = getValidFileDF(); - - List filesToDelete = originalFiles.except(validFiles).collectAsList(); - - LOG.warn("Deleting {} files", filesToDelete.size()); - return new ExpireSnapshotActionResult( - deleteFiles(getFilesOfType(filesToDelete, DATAFILE), DATAFILE), - deleteFiles(getFilesOfType(filesToDelete, MANIFEST), MANIFEST), - deleteFiles(getFilesOfType(filesToDelete, MANIFESTLIST), MANIFESTLIST), - deleteFiles(getFilesOfType(filesToDelete, OTHER), OTHER)); + private Dataset buildValidFileDF() { + return appendTypeString(buildValidDataFileDF(spark), DATA_FILE) + .union(appendTypeString(buildManifestFileDF(spark), MANIFEST)) + .union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST)); } - private Long deleteFiles(Set paths, String fileType) { - LOG.warn("{}s to delete: {}", fileType, Joiner.on(", ").join(paths)); - AtomicReference deleteCount = new AtomicReference<>(0L); + private ExpireSnapshotsActionResult deleteFiles(Iterator paths) { + AtomicLong dataFileCount = new AtomicLong(0L); + AtomicLong manifestCount = new AtomicLong(0L); + AtomicLong manifestListCount = new AtomicLong(0L); Tasks.foreach(paths) .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished() - .executeWith(ThreadPools.getWorkerPool()) - .onFailure((manifest, exc) -> LOG.warn("Delete failed for {}: {}", fileType, manifest, exc)) - .run(file -> { + .executeWith(deleteExecutorService) + .onFailure((fileInfo, exc) -> + LOG.warn("Delete failed for {}: {}", fileInfo.getString(1), fileInfo.getString(0), exc)) + .run(fileInfo -> { + String file = fileInfo.getString(0); + String type = fileInfo.getString(1); deleteFunc.accept(file); - deleteCount.updateAndGet(v -> v + 1); + switch (type) { + case DATA_FILE: + dataFileCount.incrementAndGet(); + LOG.trace("Deleted Data File: {}", file); + break; + case MANIFEST: + manifestCount.incrementAndGet(); + LOG.warn("Deleted Manifest: {}", file); + break; + case MANIFEST_LIST: + manifestListCount.incrementAndGet(); + LOG.warn("Deleted Manifest List: {}", file); + break; + } }); - return deleteCount.get(); + LOG.warn("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); + return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); } - } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java similarity index 81% rename from spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java rename to spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java index e81c384ec2fa..609e72532275 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotActionResult.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java @@ -19,19 +19,16 @@ package org.apache.iceberg.actions; -public class ExpireSnapshotActionResult { +public class ExpireSnapshotsActionResult { private final Long dataFilesDeleted; private final Long manifestFilesDeleted; private final Long manifestListsDeleted; - private final Long otherDeleted; - public ExpireSnapshotActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted, - Long otherDeleted) { + public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) { this.dataFilesDeleted = dataFilesDeleted; this.manifestFilesDeleted = manifestFilesDeleted; this.manifestListsDeleted = manifestListsDeleted; - this.otherDeleted = otherDeleted; } public Long getDataFilesDeleted() { @@ -46,7 +43,4 @@ public Long getManifestListsDeleted() { return manifestListsDeleted; } - public Long getOtherDeleted() { - return otherDeleted; - } } diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java index c61f2748a802..79f78a3627ac 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -67,7 +67,7 @@ public void setupTableLocation() throws Exception { } private void checkExpirationResults(Long expectedDatafiles, Long expectedManifestsDeleted, - Long expectedManifestListsDeleted, ExpireSnapshotActionResult results, Long expectedOther) { + Long expectedManifestListsDeleted, ExpireSnapshotsActionResult results) { Assert.assertEquals("Incorrect number of manifest files deleted", expectedManifestsDeleted, results.getManifestFilesDeleted()); @@ -75,8 +75,6 @@ private void checkExpirationResults(Long expectedDatafiles, Long expectedManifes expectedDatafiles, results.getDataFilesDeleted()); Assert.assertEquals("Incorrect number of manifest lists deleted", expectedManifestListsDeleted, results.getManifestListsDeleted()); - Assert.assertEquals("Incorrect number of other files deleted", - expectedOther, results.getOtherDeleted()); } @@ -124,7 +122,7 @@ public void testFilesCleaned() throws Exception { end = System.currentTimeMillis(); } - ExpireSnapshotActionResult results = + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); table.refresh(); @@ -137,7 +135,7 @@ public void testFilesCleaned() throws Exception { Files.exists(p)); } - checkExpirationResults(1L, 2L, 2L, results, 0L); + checkExpirationResults(1L, 2L, 2L, results); } @Test @@ -160,10 +158,10 @@ public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { .mode("append") .save(tableLocation); - ExpireSnapshotActionResult results = + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().execute(); - checkExpirationResults(0L, 0L, 0L, results, 0L); + checkExpirationResults(0L, 0L, 0L, results); } @Test @@ -199,11 +197,11 @@ public void testCleanupRepeatedOverwrites() throws Exception { end = System.currentTimeMillis(); } - ExpireSnapshotActionResult results = + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); table.refresh(); - checkExpirationResults(10L, 19L, 10L, results, 0L); + checkExpirationResults(10L, 19L, 10L, results); } } From e73f76971caa117d3fbadf8c70450f46a8ef51a2 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Thu, 6 Aug 2020 19:07:13 -0500 Subject: [PATCH 3/6] Port over all the tests from TestRemoveSnapshots --- .../actions/ExpireSnapshotsAction.java | 107 ++- .../actions/TestExpireSnapshotsAction.java | 709 ++++++++++++++++-- 2 files changed, 720 insertions(+), 96 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index ac8a2c60564e..a37d90b7137d 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -28,6 +28,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.util.Tasks; import org.apache.spark.sql.Column; @@ -38,6 +39,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * An action which performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark + * to to determine the delta in files between the pre and post-expiration table metadata. All of the same + * restrictions apply that apply to Remove Snapshots. + *

+ * This implementation uses the MetadataTables for the table being expired to list all Manifest and DataFiles. This + * is made into a Dataframe which is antiJoined with the same list read after the expiration. This operation will + * require a Shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done + * locally using a direct call to RemoveSnapshots. Deletes are still performed locally after retrieving the results + * from the SparkExecutors. + */ public class ExpireSnapshotsAction extends BaseAction { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); @@ -77,6 +89,7 @@ protected Table table() { /** * An executor service used when deleting files. Only used during the local delete phase of this Spark action + * Similar to {@link org.apache.iceberg.ExpireSnapshots#executeWith(ExecutorService)} * @param executorService the service to use * @return this for method chaining */ @@ -85,21 +98,47 @@ public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) return this; } + /** + * A specific snapshot to expire. + * Identical to {@link org.apache.iceberg.ExpireSnapshots#expireSnapshotId(long)} + * @param expireSnapshotId Id of the snapshot to expire + * @return this for method chaining + */ public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { this.expireSnapshotIdValue = expireSnapshotId; return this; } + /** + * Expire all snapshots older than a given timestamp. + * Identical to {@link org.apache.iceberg.ExpireSnapshots#expireOlderThan(long)} + * @param timestampMillis all snapshots before this time will be expired + * @return this for method chaining + */ public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { this.expireOlderThanValue = timestampMillis; return this; } + /** + * Retain at least x snapshots when expiring + * Identical to {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)} + * @param numSnapshots number of snapshots to leave + * @return this for method chaining + */ public ExpireSnapshotsAction retainLast(int numSnapshots) { + Preconditions.checkArgument(1 <= numSnapshots, + "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots); this.retainLastValue = numSnapshots; return this; } + /** + * The Consumer used on files which have been determined to be expired. By default uses a filesystem delete. + * Identical to {@link org.apache.iceberg.ExpireSnapshots#deleteWith(Consumer)} + * @param newDeleteFunc Consumer which takes a path and deletes it + * @return this for method chaining + */ public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { this.deleteFunc = newDeleteFunc; return this; @@ -107,30 +146,37 @@ public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { @Override public ExpireSnapshotsActionResult execute() { - //Metadata before Expiration - Dataset originalFiles = buildValidFileDF().persist(); - originalFiles.count(); // Action to trigger persist - - //Perform Expiration - ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); - if (expireSnapshotIdValue != null) { - expireSnaps = expireSnaps.expireSnapshotId(expireSnapshotIdValue); - } - if (expireOlderThanValue != null) { - expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); - } - if (retainLastValue != null) { - expireSnaps = expireSnaps.retainLast(retainLastValue); + Dataset originalFiles = null; + try { + // Metadata before Expiration + originalFiles = buildValidFileDF().persist(); + // Action to trigger persist + originalFiles.count(); + + // Perform Expiration + ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); + if (expireSnapshotIdValue != null) { + expireSnaps = expireSnaps.expireSnapshotId(expireSnapshotIdValue); + } + if (expireOlderThanValue != null) { + expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); + } + if (retainLastValue != null) { + expireSnaps = expireSnaps.retainLast(retainLastValue); + } + expireSnaps.commit(); + + // Metadata after Expiration + Dataset validFiles = buildValidFileDF(); + Dataset filesToDelete = originalFiles.except(validFiles); + + ExpireSnapshotsActionResult result = deleteFiles(filesToDelete.toLocalIterator()); + return result; + } finally { + if (originalFiles != null) { + originalFiles.unpersist(); + } } - expireSnaps.commit(); - - // Metadata after Expiration - Dataset validFiles = buildValidFileDF(); - Dataset filesToDelete = originalFiles.except(validFiles); - - ExpireSnapshotsActionResult result = deleteFiles(filesToDelete.toLocalIterator()); - originalFiles.unpersist(); - return result; } private Dataset appendTypeString(Dataset ds, String type) { @@ -143,12 +189,17 @@ private Dataset buildValidFileDF() { .union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST)); } - private ExpireSnapshotsActionResult deleteFiles(Iterator paths) { + /** + * Deletes files passed to it based on their type. + * @param expiredFiles an Iterator of Spark Rows of the structure (path: String, type: String) + * @return Statistics on which files were deleted + */ + private ExpireSnapshotsActionResult deleteFiles(Iterator expiredFiles) { AtomicLong dataFileCount = new AtomicLong(0L); AtomicLong manifestCount = new AtomicLong(0L); AtomicLong manifestListCount = new AtomicLong(0L); - Tasks.foreach(paths) + Tasks.foreach(expiredFiles) .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished() .executeWith(deleteExecutorService) .onFailure((fileInfo, exc) -> @@ -164,15 +215,15 @@ private ExpireSnapshotsActionResult deleteFiles(Iterator paths) { break; case MANIFEST: manifestCount.incrementAndGet(); - LOG.warn("Deleted Manifest: {}", file); + LOG.debug("Deleted Manifest: {}", file); break; case MANIFEST_LIST: manifestListCount.incrementAndGet(); - LOG.warn("Deleted Manifest List: {}", file); + LOG.debug("Deleted Manifest List: {}", file); break; } }); - LOG.warn("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); + LOG.debug("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); } } diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java index 79f78a3627ac..a085cd99a3f6 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -20,18 +20,34 @@ package org.apache.iceberg.actions; import java.io.File; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; @@ -54,16 +70,59 @@ public abstract class TestExpireSnapshotsAction extends SparkTestBase { optional(3, "c3", Types.StringType.get()) ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + + private static final List RECORDS = Lists.newArrayList(new ThreeColumnRecord(1, "AAAA", "AAAA")); + + static final DataFile FILE_A = DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_B = DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=1") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_C = DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=2") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DataFile FILE_D = DataFiles.builder(SPEC) + .withPath("/path/to/data-d.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=3") // easy way to set partition data for now + .withRecordCount(1) + .build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); private File tableDir; - private String tableLocation = null; + private String tableLocation; + private Table table; @Before public void setupTableLocation() throws Exception { this.tableDir = temp.newFolder(); this.tableLocation = tableDir.toURI().toString(); + this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + } + + private Dataset buildDF(List records) { + return spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + } + + private void writeDF(Dataset df, String mode) { + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode(mode) + .save(tableLocation); } private void checkExpirationResults(Long expectedDatafiles, Long expectedManifestsDeleted, @@ -80,42 +139,19 @@ private void checkExpirationResults(Long expectedDatafiles, Long expectedManifes @Test public void testFilesCleaned() throws Exception { - Table table = TABLES - .create(SCHEMA, - PartitionSpec.unpartitioned(), - Maps.newHashMap(), - tableLocation); + Dataset df = buildDF(RECORDS); - List records = Lists.newArrayList( - new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") - ); - - Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); - - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode("append") - .save(tableLocation); + writeDF(df, "append"); List expiredDataFiles = Files - .list(tableDir.toPath().resolve("data")) + .list(tableDir.toPath().resolve("data").resolve("c1=1")) .collect(Collectors.toList()); Assert.assertEquals("There should be a data file to delete but there was none.", 2, expiredDataFiles.size()); - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode("overwrite") - .save(tableLocation); - - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode("append") - .save(tableLocation); + writeDF(df, "overwrite"); + writeDF(df, "append"); long end = System.currentTimeMillis(); while (end <= table.currentSnapshot().timestampMillis()) { @@ -127,8 +163,7 @@ public void testFilesCleaned() throws Exception { table.refresh(); - Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, - Iterables.size(table.snapshots())); + Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots())); for (Path p : expiredDataFiles) { Assert.assertFalse(String.format("File %s still exists but should have been deleted", p), @@ -139,24 +174,62 @@ public void testFilesCleaned() throws Exception { } @Test - public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { - Table table = TABLES - .create(SCHEMA, - PartitionSpec.unpartitioned(), - Maps.newHashMap(), - tableLocation); + public void dataFilesCleanupWithParallelTasks() throws IOException { - List records = Lists.newArrayList( - new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") - ); + table.newFastAppend() + .appendFile(FILE_A) + .commit(); - Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + table.newFastAppend() + .appendFile(FILE_B) + .commit(); - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode("append") - .save(tableLocation); + table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)) + .commit(); + long thirdSnapshotId = table.currentSnapshot().snapshotId(); + + table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)) + .commit(); + long fourthSnapshotId = table.currentSnapshot().snapshotId(); + + long t4 = System.currentTimeMillis(); + while (t4 <= table.currentSnapshot().timestampMillis()) { + t4 = System.currentTimeMillis(); + } + + Set deletedFiles = Sets.newHashSet(); + Set deleteThreads = ConcurrentHashMap.newKeySet(); + AtomicInteger deleteThreadsIndex = new AtomicInteger(0); + + Actions.forTable(table).expireSnapshots() + .executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> { + Thread thread = new Thread(runnable); + thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement()); + thread.setDaemon(true); // daemon threads will be terminated abruptly when the JVM exits + return thread; + })) + .expireOlderThan(t4) + .deleteWith(s -> { + deleteThreads.add(Thread.currentThread().getName()); + deletedFiles.add(s); + }) + .execute(); + + // Verifies that the delete methods ran in the threads created by the provided ExecutorService ThreadFactory + Assert.assertEquals(deleteThreads, + Sets.newHashSet("remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); + } + + @Test + public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { + Dataset df = buildDF(RECORDS); + + writeDF(df, "append"); ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().execute(); @@ -166,30 +239,12 @@ public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { @Test public void testCleanupRepeatedOverwrites() throws Exception { - Table table = TABLES - .create(SCHEMA, - PartitionSpec.unpartitioned(), - Maps.newHashMap(), - tableLocation); - - List records = Lists.newArrayList( - new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") - ); + Dataset df = buildDF(RECORDS); - Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); - - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode("append") - .save(tableLocation); + writeDF(df, "append"); for (int i = 0; i < 10; i++) { - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode("overwrite") - .save(tableLocation); + writeDF(df, "overwrite"); } long end = System.currentTimeMillis(); @@ -200,9 +255,527 @@ public void testCleanupRepeatedOverwrites() throws Exception { ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); - table.refresh(); checkExpirationResults(10L, 19L, 10L, results); } + @Test + public void testRetainLastWithExpireOlderThan() { + long t0 = System.currentTimeMillis(); + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + long t2 = System.currentTimeMillis(); + while (t2 <= table.currentSnapshot().timestampMillis()) { + t2 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + // Retain last 2 snapshots + Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .retainLast(2) + .execute(); + + Assert.assertEquals("Should have two snapshots.", + 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", + null, table.snapshot(firstSnapshotId)); + } + + @Test + public void testRetainLastWithExpireById() { + long t0 = System.currentTimeMillis(); + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + long t2 = System.currentTimeMillis(); + while (t2 <= table.currentSnapshot().timestampMillis()) { + t2 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + // Retain last 3 snapshots, but explicitly remove the first snapshot + Actions.forTable(table).expireSnapshots() + .expireSnapshotId(firstSnapshotId) + .retainLast(3) + .execute(); + + Assert.assertEquals("Should have two snapshots.", + 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", + null, table.snapshot(firstSnapshotId)); + } + + @Test + public void testRetainLastWithTooFewSnapshots() { + long t0 = System.currentTimeMillis(); + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .appendFile(FILE_B) // data_bucket=1 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t2 = System.currentTimeMillis(); + while (t2 <= table.currentSnapshot().timestampMillis()) { + t2 = System.currentTimeMillis(); + } + + // Retain last 3 snapshots + Actions.forTable(table).expireSnapshots() + .expireOlderThan(t2) + .retainLast(3) + .execute(); + + Assert.assertEquals("Should have two snapshots", + 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should still present", + firstSnapshotId, table.snapshot(firstSnapshotId).snapshotId()); + } + + @Test + public void testRetainLastKeepsExpiringSnapshot() { + long t0 = System.currentTimeMillis(); + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + long t2 = System.currentTimeMillis(); + while (t2 <= table.currentSnapshot().timestampMillis()) { + t2 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_D) // data_bucket=3 + .commit(); + + long t4 = System.currentTimeMillis(); + while (t4 <= table.currentSnapshot().timestampMillis()) { + t4 = System.currentTimeMillis(); + } + + // Retain last 2 snapshots and expire older than t3 + Actions.forTable(table).expireSnapshots() + .expireOlderThan(secondSnapshot.timestampMillis()) + .retainLast(2) + .execute(); + + Assert.assertEquals("Should have three snapshots.", + 3, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNotNull("Second snapshot should present.", + table.snapshot(secondSnapshot.snapshotId())); + } + + @Test + public void testExpireOlderThanMultipleCalls() { + long t0 = System.currentTimeMillis(); + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + long t2 = System.currentTimeMillis(); + while (t2 <= table.currentSnapshot().timestampMillis()) { + t2 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + Snapshot thirdSnapshot = table.currentSnapshot(); + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + // Retain last 2 snapshots and expire older than t3 + Actions.forTable(table).expireSnapshots() + .expireOlderThan(secondSnapshot.timestampMillis()) + .expireOlderThan(thirdSnapshot.timestampMillis()) + .execute(); + + Assert.assertEquals("Should have one snapshots.", + 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNull("Second snapshot should not present.", + table.snapshot(secondSnapshot.snapshotId())); + } + + @Test + public void testRetainLastMultipleCalls() { + long t0 = System.currentTimeMillis(); + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long t1 = System.currentTimeMillis(); + while (t1 <= table.currentSnapshot().timestampMillis()) { + t1 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + long t2 = System.currentTimeMillis(); + while (t2 <= table.currentSnapshot().timestampMillis()) { + t2 = System.currentTimeMillis(); + } + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); + + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + // Retain last 2 snapshots and expire older than t3 + Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .retainLast(2) + .retainLast(1) + .execute(); + + Assert.assertEquals("Should have one snapshots.", + 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNull("Second snapshot should not present.", + table.snapshot(secondSnapshot.snapshotId())); + } + + @Test + public void testRetainZeroSnapshots() { + AssertHelpers.assertThrows("Should fail retain 0 snapshots " + + "because number of snapshots to retain cannot be zero", + IllegalArgumentException.class, + "Number of snapshots to retain must be at least 1, cannot be: 0", + () -> Actions.forTable(table).expireSnapshots().retainLast(0).execute()); + } + + @Test + public void testScanExpiredManifestInValidSnapshotAppend() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.newOverwrite() + .addFile(FILE_C) + .deleteFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_D) + .commit(); + + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + Set deletedFiles = Sets.newHashSet(); + + Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .deleteWith(deletedFiles::add) + .execute(); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + + } + + @Test + public void testScanExpiredManifestInValidSnapshotFastAppend() { + table.updateProperties() + .set(TableProperties.MANIFEST_MERGE_ENABLED, "true") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.newOverwrite() + .addFile(FILE_C) + .deleteFile(FILE_A) + .commit(); + + table.newFastAppend() + .appendFile(FILE_D) + .commit(); + + long t3 = System.currentTimeMillis(); + while (t3 <= table.currentSnapshot().timestampMillis()) { + t3 = System.currentTimeMillis(); + } + + Set deletedFiles = Sets.newHashSet(); + + Actions.forTable(table).expireSnapshots() + .expireOlderThan(t3) + .deleteWith(deletedFiles::add) + .execute(); + + Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + } + + /** + * Test on table below, and expiring the staged commit `B` using `expireOlderThan` API. + * Table: A - C + * ` B (staged) + */ + @Test + public void testWithExpiringDanglingStageCommit() { + // `A` commit + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // `B` staged commit + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + TableMetadata base = ((BaseTable) table).operations().current(); + Snapshot snapshotA = base.snapshots().get(0); + Snapshot snapshotB = base.snapshots().get(1); + + // `C` commit + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Set deletedFiles = new HashSet<>(); + + // Expire all commits including dangling staged snapshot. + Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireOlderThan(snapshotB.timestampMillis() + 1) + .execute(); + + Set expectedDeletes = new HashSet<>(); + expectedDeletes.add(snapshotA.manifestListLocation()); + + // Files should be deleted of dangling staged snapshot + snapshotB.addedFiles().forEach(i -> { + expectedDeletes.add(i.path().toString()); + }); + + // ManifestList should be deleted too + expectedDeletes.add(snapshotB.manifestListLocation()); + snapshotB.dataManifests().forEach(file -> { + //Only the manifest of B should be deleted. + if (file.snapshotId() == snapshotB.snapshotId()) { + expectedDeletes.add(file.path()); + } + }); + Assert.assertSame("Files deleted count should be expected", expectedDeletes.size(), deletedFiles.size()); + //Take the diff + expectedDeletes.removeAll(deletedFiles); + Assert.assertTrue("Exactly same files should be deleted", expectedDeletes.isEmpty()); + } + + /** + * Expire cherry-pick the commit as shown below, when `B` is in table's current state + * Table: + * A - B - C <--current snapshot + * `- D (source=B) + */ + @Test + public void testWithCherryPickTableSnapshot() { + // `A` commit + table.newAppend() + .appendFile(FILE_A) + .commit(); + Snapshot snapshotA = table.currentSnapshot(); + + // `B` commit + Set deletedAFiles = new HashSet<>(); + table.newOverwrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .deleteWith(deletedAFiles::add) + .commit(); + Assert.assertTrue("No files should be physically deleted", deletedAFiles.isEmpty()); + + // pick the snapshot 'B` + Snapshot snapshotB = table.currentSnapshot(); + + // `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick + table.newAppend() + .appendFile(FILE_C) + .commit(); + Snapshot snapshotC = table.currentSnapshot(); + + // Move the table back to `A` + table.manageSnapshots() + .setCurrentSnapshot(snapshotA.snapshotId()) + .commit(); + + // Generate A -> `D (B)` + table.manageSnapshots() + .cherrypick(snapshotB.snapshotId()) + .commit(); + Snapshot snapshotD = table.currentSnapshot(); + + // Move the table back to `C` + table.manageSnapshots() + .setCurrentSnapshot(snapshotC.snapshotId()) + .commit(); + List deletedFiles = new ArrayList<>(); + + // Expire `C` + Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireOlderThan(snapshotC.timestampMillis() + 1) + .execute(); + + // Make sure no dataFiles are deleted for the B, C, D snapshot + Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> { + i.addedFiles().forEach(item -> { + Assert.assertFalse(deletedFiles.contains(item.path().toString())); + }); + }); + } + + /** + * Test on table below, and expiring `B` which is not in current table state. + * 1) Expire `B` + * 2) All commit + * Table: A - C - D (B) + * ` B (staged) + */ + @Test + public void testWithExpiringStagedThenCherrypick() { + // `A` commit + table.newAppend() + .appendFile(FILE_A) + .commit(); + + // `B` commit + table.newAppend() + .appendFile(FILE_B) + .stageOnly() + .commit(); + + // pick the snapshot that's staged but not committed + TableMetadata base = ((BaseTable) table).operations().current(); + Snapshot snapshotB = base.snapshots().get(1); + + // `C` commit to let cherry-pick take effect, and avoid fast-forward of `B` with cherry-pick + table.newAppend() + .appendFile(FILE_C) + .commit(); + + // `D (B)` cherry-pick commit + table.manageSnapshots() + .cherrypick(snapshotB.snapshotId()) + .commit(); + + base = ((BaseTable) table).operations().current(); + Snapshot snapshotD = base.snapshots().get(3); + + List deletedFiles = new ArrayList<>(); + + // Expire `B` commit. + Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireSnapshotId(snapshotB.snapshotId()) + .execute(); + + // Make sure no dataFiles are deleted for the staged snapshot + Lists.newArrayList(snapshotB).forEach(i -> { + i.addedFiles().forEach(item -> { + Assert.assertFalse(deletedFiles.contains(item.path().toString())); + }); + }); + + // Expire all snapshots including cherry-pick + Actions.forTable(table).expireSnapshots() + .deleteWith(deletedFiles::add) + .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) + .execute(); + + // Make sure no dataFiles are deleted for the staged and cherry-pick + Lists.newArrayList(snapshotB, snapshotD).forEach(i -> { + i.addedFiles().forEach(item -> { + Assert.assertFalse(deletedFiles.contains(item.path().toString())); + }); + }); + } + } From 9cc4e831679d0aab1e4409210f3a7c57094c440c Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Fri, 7 Aug 2020 10:38:45 -0500 Subject: [PATCH 4/6] Review Comments Move getManifestLists / getOtherManifestPaths to Core Module Fixup of doc typos --- .../org/apache/iceberg/util/TableUtil.java | 66 +++++++++++++++++++ .../apache/iceberg/actions/BaseAction.java | 23 +------ .../actions/ExpireSnapshotsAction.java | 12 ++-- .../actions/TestExpireSnapshotsAction.java | 1 - 4 files changed, 75 insertions(+), 27 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/util/TableUtil.java diff --git a/core/src/main/java/org/apache/iceberg/util/TableUtil.java b/core/src/main/java/org/apache/iceberg/util/TableUtil.java new file mode 100644 index 000000000000..10b30f488123 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/TableUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.util.List; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TableUtil { + + private TableUtil(){} + + /** + * Returns all the path locations of all Manifest Lists for a given table + * @param table the table + * @return the paths of the Manifest Lists + */ + public static List getManifestListPaths(Table table) { + List manifestLists = Lists.newArrayList(); + for (Snapshot snapshot : table.snapshots()) { + String manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null) { + manifestLists.add(manifestListLocation); + } + } + return manifestLists; + } + + /** + * Returns all Metadata file paths which may not be in the current metadata. Specifically + * this includes "version-hint" files as well as entries in metadata.previousFiles. + * @param ops TableOperations for the table we will be getting paths from + * @return a list of paths to metadata files + */ + public static List getOtherMetadataFilePaths(TableOperations ops) { + List otherMetadataFiles = Lists.newArrayList(); + otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); + + TableMetadata metadata = ops.current(); + otherMetadataFiles.add(metadata.metadataFileLocation()); + for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { + otherMetadataFiles.add(previousMetadataFile.file()); + } + return otherMetadataFiles; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java index 3d06317d8aa1..abdb709cafb2 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java @@ -21,11 +21,9 @@ import java.util.List; import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.TableUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -63,27 +61,12 @@ protected Dataset buildManifestFileDF(SparkSession spark) { } protected Dataset buildManifestListDF(SparkSession spark, Table table) { - List manifestLists = Lists.newArrayList(); - for (Snapshot snapshot : table.snapshots()) { - String manifestListLocation = snapshot.manifestListLocation(); - if (manifestListLocation != null) { - manifestLists.add(manifestListLocation); - } - } - + List manifestLists = TableUtil.getManifestListPaths(table); return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path"); } protected Dataset buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) { - List otherMetadataFiles = Lists.newArrayList(); - otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); - - TableMetadata metadata = ops.current(); - otherMetadataFiles.add(metadata.metadataFileLocation()); - for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { - otherMetadataFiles.add(previousMetadataFile.file()); - } - + List otherMetadataFiles = TableUtil.getOtherMetadataFilePaths(ops); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index a37d90b7137d..70231a8b91b1 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -42,13 +42,13 @@ /** * An action which performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark * to to determine the delta in files between the pre and post-expiration table metadata. All of the same - * restrictions apply that apply to Remove Snapshots. + * restrictions of Remove Snapshots also apply to this action. *

- * This implementation uses the MetadataTables for the table being expired to list all Manifest and DataFiles. This - * is made into a Dataframe which is antiJoined with the same list read after the expiration. This operation will - * require a Shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done - * locally using a direct call to RemoveSnapshots. Deletes are still performed locally after retrieving the results - * from the SparkExecutors. + * This implementation uses the metadata tables for the table being expired to list all Manifest and DataFiles. This + * is made into a Dataframe which are anti-joined with the same list read after the expiration. This operation will + * require a shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done + * locally using a direct call to RemoveSnapshots. The snapshot expiration will be fully committed before any deletes + * are issued. Deletes are still performed locally after retrieving the results from the Spark executors. */ public class ExpireSnapshotsAction extends BaseAction { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java index a085cd99a3f6..1ddec6aff00f 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -136,7 +136,6 @@ private void checkExpirationResults(Long expectedDatafiles, Long expectedManifes expectedManifestListsDeleted, results.getManifestListsDeleted()); } - @Test public void testFilesCleaned() throws Exception { Dataset df = buildDF(RECORDS); From a50e3e8afe7dd209e0d17404654c7c00a547ceb1 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Mon, 10 Aug 2020 10:24:47 -0500 Subject: [PATCH 5/6] Change Log Level of File Delete Count to INFO --- .../java/org/apache/iceberg/actions/ExpireSnapshotsAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index 70231a8b91b1..ecb09bda0436 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -223,7 +223,7 @@ private ExpireSnapshotsActionResult deleteFiles(Iterator expiredFiles) { break; } }); - LOG.debug("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); + LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); } } From 1732bc27804666d2a7dc7907331e2d5000a66a98 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Mon, 10 Aug 2020 18:03:59 -0500 Subject: [PATCH 6/6] Reviewer Comments Refactoring of Tests, All tests use only table.operations no Spark Writes All tests now check file deletions Renaming of class methods to fit style Removal of TableUtils, Functions moved back into BaseAction ExpireSnapshotsAction defaults to single threaded deleter --- .../org/apache/iceberg/util/TableUtil.java | 66 ---- .../apache/iceberg/actions/BaseAction.java | 43 ++- .../actions/ExpireSnapshotsAction.java | 35 +- .../actions/ExpireSnapshotsActionResult.java | 6 +- .../actions/TestExpireSnapshotsAction.java | 308 +++++++----------- 5 files changed, 175 insertions(+), 283 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/util/TableUtil.java diff --git a/core/src/main/java/org/apache/iceberg/util/TableUtil.java b/core/src/main/java/org/apache/iceberg/util/TableUtil.java deleted file mode 100644 index 10b30f488123..000000000000 --- a/core/src/main/java/org/apache/iceberg/util/TableUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.util; - -import java.util.List; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; - -public class TableUtil { - - private TableUtil(){} - - /** - * Returns all the path locations of all Manifest Lists for a given table - * @param table the table - * @return the paths of the Manifest Lists - */ - public static List getManifestListPaths(Table table) { - List manifestLists = Lists.newArrayList(); - for (Snapshot snapshot : table.snapshots()) { - String manifestListLocation = snapshot.manifestListLocation(); - if (manifestListLocation != null) { - manifestLists.add(manifestListLocation); - } - } - return manifestLists; - } - - /** - * Returns all Metadata file paths which may not be in the current metadata. Specifically - * this includes "version-hint" files as well as entries in metadata.previousFiles. - * @param ops TableOperations for the table we will be getting paths from - * @return a list of paths to metadata files - */ - public static List getOtherMetadataFilePaths(TableOperations ops) { - List otherMetadataFiles = Lists.newArrayList(); - otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); - - TableMetadata metadata = ops.current(); - otherMetadataFiles.add(metadata.metadataFileLocation()); - for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { - otherMetadataFiles.add(previousMetadataFile.file()); - } - return otherMetadataFiles; - } -} diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java index abdb709cafb2..570fb5955986 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java @@ -21,15 +21,16 @@ import java.util.List; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.util.TableUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; - abstract class BaseAction implements Action { protected abstract Table table(); @@ -50,6 +51,40 @@ protected String metadataTableName(MetadataTableType type) { } } + /** + * Returns all the path locations of all Manifest Lists for a given table + * @param table the table + * @return the paths of the Manifest Lists + */ + protected List getManifestListPaths(Table table) { + List manifestLists = Lists.newArrayList(); + for (Snapshot snapshot : table.snapshots()) { + String manifestListLocation = snapshot.manifestListLocation(); + if (manifestListLocation != null) { + manifestLists.add(manifestListLocation); + } + } + return manifestLists; + } + + /** + * Returns all Metadata file paths which may not be in the current metadata. Specifically + * this includes "version-hint" files as well as entries in metadata.previousFiles. + * @param ops TableOperations for the table we will be getting paths from + * @return a list of paths to metadata files + */ + protected List getOtherMetadataFilePaths(TableOperations ops) { + List otherMetadataFiles = Lists.newArrayList(); + otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); + + TableMetadata metadata = ops.current(); + otherMetadataFiles.add(metadata.metadataFileLocation()); + for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) { + otherMetadataFiles.add(previousMetadataFile.file()); + } + return otherMetadataFiles; + } + protected Dataset buildValidDataFileDF(SparkSession spark) { String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES); return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path"); @@ -61,12 +96,12 @@ protected Dataset buildManifestFileDF(SparkSession spark) { } protected Dataset buildManifestListDF(SparkSession spark, Table table) { - List manifestLists = TableUtil.getManifestListPaths(table); + List manifestLists = getManifestListPaths(table); return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path"); } protected Dataset buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) { - List otherMetadataFiles = TableUtil.getOtherMetadataFilePaths(ops); + List otherMetadataFiles = getOtherMetadataFilePaths(ops); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index ecb09bda0436..79fc00ad2486 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -20,6 +20,7 @@ package org.apache.iceberg.actions; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -29,7 +30,7 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; @@ -40,8 +41,8 @@ import org.slf4j.LoggerFactory; /** - * An action which performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark - * to to determine the delta in files between the pre and post-expiration table metadata. All of the same + * An action which performs the same operation as {@link ExpireSnapshots} but uses Spark + * to determine the delta in files between the pre and post-expiration table metadata. All of the same * restrictions of Remove Snapshots also apply to this action. *

* This implementation uses the metadata tables for the table being expired to list all Manifest and DataFiles. This @@ -58,7 +59,7 @@ public class ExpireSnapshotsAction extends BaseAction expireSnapshotIdValues = Sets.newHashSet(); private Long expireOlderThanValue = null; private Integer retainLastValue = null; private Consumer deleteFunc = defaultDelete; @@ -88,8 +89,8 @@ protected Table table() { } /** - * An executor service used when deleting files. Only used during the local delete phase of this Spark action - * Similar to {@link org.apache.iceberg.ExpireSnapshots#executeWith(ExecutorService)} + * An executor service used when deleting files. Only used during the local delete phase of this Spark action. + * Similar to {@link ExpireSnapshots#executeWith(ExecutorService)} * @param executorService the service to use * @return this for method chaining */ @@ -100,18 +101,18 @@ public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) /** * A specific snapshot to expire. - * Identical to {@link org.apache.iceberg.ExpireSnapshots#expireSnapshotId(long)} + * Identical to {@link ExpireSnapshots#expireSnapshotId(long)} * @param expireSnapshotId Id of the snapshot to expire * @return this for method chaining */ public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { - this.expireSnapshotIdValue = expireSnapshotId; + this.expireSnapshotIdValues.add(expireSnapshotId); return this; } /** * Expire all snapshots older than a given timestamp. - * Identical to {@link org.apache.iceberg.ExpireSnapshots#expireOlderThan(long)} + * Identical to {@link ExpireSnapshots#expireOlderThan(long)} * @param timestampMillis all snapshots before this time will be expired * @return this for method chaining */ @@ -122,7 +123,7 @@ public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { /** * Retain at least x snapshots when expiring - * Identical to {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)} + * Identical to {@link ExpireSnapshots#retainLast(int)} * @param numSnapshots number of snapshots to leave * @return this for method chaining */ @@ -135,7 +136,7 @@ public ExpireSnapshotsAction retainLast(int numSnapshots) { /** * The Consumer used on files which have been determined to be expired. By default uses a filesystem delete. - * Identical to {@link org.apache.iceberg.ExpireSnapshots#deleteWith(Consumer)} + * Identical to {@link ExpireSnapshots#deleteWith(Consumer)} * @param newDeleteFunc Consumer which takes a path and deletes it * @return this for method chaining */ @@ -155,23 +156,25 @@ public ExpireSnapshotsActionResult execute() { // Perform Expiration ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); - if (expireSnapshotIdValue != null) { - expireSnaps = expireSnaps.expireSnapshotId(expireSnapshotIdValue); + for (final Long id : expireSnapshotIdValues) { + expireSnaps = expireSnaps.expireSnapshotId(id); } + if (expireOlderThanValue != null) { expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); } + if (retainLastValue != null) { expireSnaps = expireSnaps.retainLast(retainLastValue); } + expireSnaps.commit(); // Metadata after Expiration Dataset validFiles = buildValidFileDF(); Dataset filesToDelete = originalFiles.except(validFiles); - ExpireSnapshotsActionResult result = deleteFiles(filesToDelete.toLocalIterator()); - return result; + return deleteFiles(filesToDelete.toLocalIterator()); } finally { if (originalFiles != null) { originalFiles.unpersist(); diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java index 609e72532275..adb74bcdfc96 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java @@ -31,15 +31,15 @@ public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDele this.manifestListsDeleted = manifestListsDeleted; } - public Long getDataFilesDeleted() { + public Long dataFilesDeleted() { return dataFilesDeleted; } - public Long getManifestFilesDeleted() { + public Long manifestFilesDeleted() { return manifestFilesDeleted; } - public Long getManifestListsDeleted() { + public Long manifestListsDeleted() { return manifestListsDeleted; } diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java index 1ddec6aff00f..7a44ae1f9d12 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -21,8 +21,6 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -30,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseTable; @@ -49,10 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; -import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -72,8 +66,6 @@ public abstract class TestExpireSnapshotsAction extends SparkTestBase { private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); - private static final List RECORDS = Lists.newArrayList(new ThreeColumnRecord(1, "AAAA", "AAAA")); - static final DataFile FILE_A = DataFiles.builder(SPEC) .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(10) @@ -113,63 +105,47 @@ public void setupTableLocation() throws Exception { this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); } - private Dataset buildDF(List records) { - return spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); - } - - private void writeDF(Dataset df, String mode) { - df.select("c1", "c2", "c3") - .write() - .format("iceberg") - .mode(mode) - .save(tableLocation); + private Long rightAfterSnapshot() { + Long end = System.currentTimeMillis(); + while (end <= table.currentSnapshot().timestampMillis()) { + end = System.currentTimeMillis(); + } + return end; } private void checkExpirationResults(Long expectedDatafiles, Long expectedManifestsDeleted, Long expectedManifestListsDeleted, ExpireSnapshotsActionResult results) { Assert.assertEquals("Incorrect number of manifest files deleted", - expectedManifestsDeleted, results.getManifestFilesDeleted()); + expectedManifestsDeleted, results.manifestFilesDeleted()); Assert.assertEquals("Incorrect number of datafiles deleted", - expectedDatafiles, results.getDataFilesDeleted()); + expectedDatafiles, results.dataFilesDeleted()); Assert.assertEquals("Incorrect number of manifest lists deleted", - expectedManifestListsDeleted, results.getManifestListsDeleted()); + expectedManifestListsDeleted, results.manifestListsDeleted()); } @Test public void testFilesCleaned() throws Exception { - Dataset df = buildDF(RECORDS); - - writeDF(df, "append"); - - List expiredDataFiles = Files - .list(tableDir.toPath().resolve("data").resolve("c1=1")) - .collect(Collectors.toList()); - - Assert.assertEquals("There should be a data file to delete but there was none.", - 2, expiredDataFiles.size()); + table.newFastAppend() + .appendFile(FILE_A) + .commit(); - writeDF(df, "overwrite"); - writeDF(df, "append"); + table.newOverwrite() + .deleteFile(FILE_A) + .addFile(FILE_B) + .commit(); - long end = System.currentTimeMillis(); - while (end <= table.currentSnapshot().timestampMillis()) { - end = System.currentTimeMillis(); - } + table.newFastAppend() + .appendFile(FILE_C) + .commit(); - ExpireSnapshotsActionResult results = - Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); + long end = rightAfterSnapshot(); - table.refresh(); + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots())); - for (Path p : expiredDataFiles) { - Assert.assertFalse(String.format("File %s still exists but should have been deleted", p), - Files.exists(p)); - } - - checkExpirationResults(1L, 2L, 2L, results); + checkExpirationResults(1L, 1L, 2L, results); } @Test @@ -186,23 +162,18 @@ public void dataFilesCleanupWithParallelTasks() throws IOException { table.newRewrite() .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D)) .commit(); - long thirdSnapshotId = table.currentSnapshot().snapshotId(); table.newRewrite() .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C)) .commit(); - long fourthSnapshotId = table.currentSnapshot().snapshotId(); - long t4 = System.currentTimeMillis(); - while (t4 <= table.currentSnapshot().timestampMillis()) { - t4 = System.currentTimeMillis(); - } + long t4 = rightAfterSnapshot(); Set deletedFiles = Sets.newHashSet(); Set deleteThreads = ConcurrentHashMap.newKeySet(); AtomicInteger deleteThreadsIndex = new AtomicInteger(0); - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .executeDeleteWith(Executors.newFixedThreadPool(4, runnable -> { Thread thread = new Thread(runnable); thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement()); @@ -222,44 +193,45 @@ public void dataFilesCleanupWithParallelTasks() throws IOException { Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); + + checkExpirationResults(2L, 3L, 3L, result); } @Test public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { - Dataset df = buildDF(RECORDS); - - writeDF(df, "append"); - - ExpireSnapshotsActionResult results = - Actions.forTable(table).expireSnapshots().execute(); + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().execute(); checkExpirationResults(0L, 0L, 0L, results); } @Test public void testCleanupRepeatedOverwrites() throws Exception { - Dataset df = buildDF(RECORDS); - - writeDF(df, "append"); + table.newFastAppend() + .appendFile(FILE_A) + .commit(); for (int i = 0; i < 10; i++) { - writeDF(df, "overwrite"); + table.newOverwrite() + .deleteFile(FILE_A) + .addFile(FILE_B) + .commit(); + + table.newOverwrite() + .deleteFile(FILE_B) + .addFile(FILE_A) + .commit(); } - long end = System.currentTimeMillis(); - while (end <= table.currentSnapshot().timestampMillis()) { - end = System.currentTimeMillis(); - } - - ExpireSnapshotsActionResult results = - Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); - - checkExpirationResults(10L, 19L, 10L, results); + long end = rightAfterSnapshot(); + ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); + checkExpirationResults(1L, 39L, 20L, results); } @Test public void testRetainLastWithExpireOlderThan() { - long t0 = System.currentTimeMillis(); table.newAppend() .appendFile(FILE_A) // data_bucket=0 .commit(); @@ -273,19 +245,11 @@ public void testRetainLastWithExpireOlderThan() { .appendFile(FILE_B) // data_bucket=1 .commit(); - long t2 = System.currentTimeMillis(); - while (t2 <= table.currentSnapshot().timestampMillis()) { - t2 = System.currentTimeMillis(); - } - table.newAppend() .appendFile(FILE_C) // data_bucket=2 .commit(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } + long t3 = rightAfterSnapshot(); // Retain last 2 snapshots Actions.forTable(table).expireSnapshots() @@ -293,224 +257,180 @@ public void testRetainLastWithExpireOlderThan() { .retainLast(2) .execute(); - Assert.assertEquals("Should have two snapshots.", - 2, Lists.newArrayList(table.snapshots()).size()); - Assert.assertEquals("First snapshot should not present.", - null, table.snapshot(firstSnapshotId)); + Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); } @Test - public void testRetainLastWithExpireById() { - long t0 = System.currentTimeMillis(); + public void testExpireTwoSnapshotsById() throws Exception { table.newAppend() .appendFile(FILE_A) // data_bucket=0 .commit(); long firstSnapshotId = table.currentSnapshot().snapshotId(); - long t1 = System.currentTimeMillis(); - while (t1 <= table.currentSnapshot().timestampMillis()) { - t1 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_B) // data_bucket=1 .commit(); - long t2 = System.currentTimeMillis(); - while (t2 <= table.currentSnapshot().timestampMillis()) { - t2 = System.currentTimeMillis(); - } + long secondSnapshotID = table.currentSnapshot().snapshotId(); table.newAppend() .appendFile(FILE_C) // data_bucket=2 .commit(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } + // Retain last 2 snapshots + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() + .expireSnapshotId(firstSnapshotId) + .expireSnapshotId(secondSnapshotID) + .execute(); + + Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); + Assert.assertEquals("Second snapshot should not be present.", null, table.snapshot(secondSnapshotID)); + + checkExpirationResults(0L, 0L, 2L, result); + } + + @Test + public void testRetainLastWithExpireById() { + table.newAppend() + .appendFile(FILE_A) // data_bucket=0 + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newAppend() + .appendFile(FILE_B) // data_bucket=1 + .commit(); + + table.newAppend() + .appendFile(FILE_C) // data_bucket=2 + .commit(); // Retain last 3 snapshots, but explicitly remove the first snapshot - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireSnapshotId(firstSnapshotId) .retainLast(3) .execute(); - Assert.assertEquals("Should have two snapshots.", - 2, Lists.newArrayList(table.snapshots()).size()); - Assert.assertEquals("First snapshot should not present.", - null, table.snapshot(firstSnapshotId)); + Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); + checkExpirationResults(0L, 0L, 1L, result); } @Test public void testRetainLastWithTooFewSnapshots() { - long t0 = System.currentTimeMillis(); table.newAppend() .appendFile(FILE_A) // data_bucket=0 .appendFile(FILE_B) // data_bucket=1 .commit(); long firstSnapshotId = table.currentSnapshot().snapshotId(); - long t1 = System.currentTimeMillis(); - while (t1 <= table.currentSnapshot().timestampMillis()) { - t1 = System.currentTimeMillis(); - } - table.newAppend() .appendFile(FILE_C) // data_bucket=2 .commit(); - long t2 = System.currentTimeMillis(); - while (t2 <= table.currentSnapshot().timestampMillis()) { - t2 = System.currentTimeMillis(); - } + long t2 = rightAfterSnapshot(); // Retain last 3 snapshots - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireOlderThan(t2) .retainLast(3) .execute(); - Assert.assertEquals("Should have two snapshots", - 2, Lists.newArrayList(table.snapshots()).size()); + Assert.assertEquals("Should have two snapshots", 2, Lists.newArrayList(table.snapshots()).size()); Assert.assertEquals("First snapshot should still present", firstSnapshotId, table.snapshot(firstSnapshotId).snapshotId()); + checkExpirationResults(0L, 0L, 0L, result); } @Test public void testRetainLastKeepsExpiringSnapshot() { - long t0 = System.currentTimeMillis(); table.newAppend() .appendFile(FILE_A) // data_bucket=0 .commit(); - long t1 = System.currentTimeMillis(); - while (t1 <= table.currentSnapshot().timestampMillis()) { - t1 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_B) // data_bucket=1 .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - long t2 = System.currentTimeMillis(); - while (t2 <= table.currentSnapshot().timestampMillis()) { - t2 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_C) // data_bucket=2 .commit(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } - table.newAppend() .appendFile(FILE_D) // data_bucket=3 .commit(); - long t4 = System.currentTimeMillis(); - while (t4 <= table.currentSnapshot().timestampMillis()) { - t4 = System.currentTimeMillis(); - } - // Retain last 2 snapshots and expire older than t3 - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireOlderThan(secondSnapshot.timestampMillis()) .retainLast(2) .execute(); - Assert.assertEquals("Should have three snapshots.", - 3, Lists.newArrayList(table.snapshots()).size()); - Assert.assertNotNull("Second snapshot should present.", - table.snapshot(secondSnapshot.snapshotId())); + Assert.assertEquals("Should have three snapshots.", 3, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNotNull("Second snapshot should present.", table.snapshot(secondSnapshot.snapshotId())); + checkExpirationResults(0L, 0L, 1L, result); } @Test public void testExpireOlderThanMultipleCalls() { - long t0 = System.currentTimeMillis(); table.newAppend() .appendFile(FILE_A) // data_bucket=0 .commit(); - long t1 = System.currentTimeMillis(); - while (t1 <= table.currentSnapshot().timestampMillis()) { - t1 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_B) // data_bucket=1 .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - long t2 = System.currentTimeMillis(); - while (t2 <= table.currentSnapshot().timestampMillis()) { - t2 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_C) // data_bucket=2 .commit(); Snapshot thirdSnapshot = table.currentSnapshot(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } // Retain last 2 snapshots and expire older than t3 - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireOlderThan(secondSnapshot.timestampMillis()) .expireOlderThan(thirdSnapshot.timestampMillis()) .execute(); - Assert.assertEquals("Should have one snapshots.", - 1, Lists.newArrayList(table.snapshots()).size()); - Assert.assertNull("Second snapshot should not present.", - table.snapshot(secondSnapshot.snapshotId())); + Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); + checkExpirationResults(0L, 0L, 2L, result); } @Test public void testRetainLastMultipleCalls() { - long t0 = System.currentTimeMillis(); table.newAppend() .appendFile(FILE_A) // data_bucket=0 .commit(); - long t1 = System.currentTimeMillis(); - while (t1 <= table.currentSnapshot().timestampMillis()) { - t1 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_B) // data_bucket=1 .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - long t2 = System.currentTimeMillis(); - while (t2 <= table.currentSnapshot().timestampMillis()) { - t2 = System.currentTimeMillis(); - } table.newAppend() .appendFile(FILE_C) // data_bucket=2 .commit(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } + long t3 = rightAfterSnapshot(); // Retain last 2 snapshots and expire older than t3 - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireOlderThan(t3) .retainLast(2) .retainLast(1) .execute(); - Assert.assertEquals("Should have one snapshots.", - 1, Lists.newArrayList(table.snapshots()).size()); - Assert.assertNull("Second snapshot should not present.", - table.snapshot(secondSnapshot.snapshotId())); + Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); + Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); + checkExpirationResults(0L, 0L, 2L, result); } @Test @@ -538,20 +458,17 @@ public void testScanExpiredManifestInValidSnapshotAppend() { .appendFile(FILE_D) .commit(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } + long t3 = rightAfterSnapshot(); Set deletedFiles = Sets.newHashSet(); - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireOlderThan(t3) .deleteWith(deletedFiles::add) .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); - + checkExpirationResults(1L, 1L, 2L, result); } @Test @@ -575,19 +492,17 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { .appendFile(FILE_D) .commit(); - long t3 = System.currentTimeMillis(); - while (t3 <= table.currentSnapshot().timestampMillis()) { - t3 = System.currentTimeMillis(); - } + long t3 = rightAfterSnapshot(); Set deletedFiles = Sets.newHashSet(); - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .expireOlderThan(t3) .deleteWith(deletedFiles::add) .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); + checkExpirationResults(1L, 1L, 2L, result); } /** @@ -620,11 +535,13 @@ public void testWithExpiringDanglingStageCommit() { Set deletedFiles = new HashSet<>(); // Expire all commits including dangling staged snapshot. - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .deleteWith(deletedFiles::add) .expireOlderThan(snapshotB.timestampMillis() + 1) .execute(); + checkExpirationResults(1L, 1L, 2L, result); + Set expectedDeletes = new HashSet<>(); expectedDeletes.add(snapshotA.manifestListLocation()); @@ -697,7 +614,7 @@ public void testWithCherryPickTableSnapshot() { List deletedFiles = new ArrayList<>(); // Expire `C` - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult result = Actions.forTable(table).expireSnapshots() .deleteWith(deletedFiles::add) .expireOlderThan(snapshotC.timestampMillis() + 1) .execute(); @@ -708,6 +625,8 @@ public void testWithCherryPickTableSnapshot() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); + + checkExpirationResults(1L, 2L, 2L, result); } /** @@ -750,7 +669,7 @@ public void testWithExpiringStagedThenCherrypick() { List deletedFiles = new ArrayList<>(); // Expire `B` commit. - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult firstResult = Actions.forTable(table).expireSnapshots() .deleteWith(deletedFiles::add) .expireSnapshotId(snapshotB.snapshotId()) .execute(); @@ -761,9 +680,10 @@ public void testWithExpiringStagedThenCherrypick() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); + checkExpirationResults(0L, 1L, 1L, firstResult); // Expire all snapshots including cherry-pick - Actions.forTable(table).expireSnapshots() + ExpireSnapshotsActionResult secondResult = Actions.forTable(table).expireSnapshots() .deleteWith(deletedFiles::add) .expireOlderThan(table.currentSnapshot().timestampMillis() + 1) .execute(); @@ -774,7 +694,7 @@ public void testWithExpiringStagedThenCherrypick() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); + checkExpirationResults(0L, 0L, 2L, secondResult); } - }