diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java index b09d12ff6344..e1f1ff0146e3 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java @@ -24,14 +24,26 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; import org.junit.After; import org.junit.Assert; @@ -224,4 +236,69 @@ public void testConcurrentExpireSnapshotsWithInvalidInput() { catalogName, tableIdent, -1)); } + + @Test + public void testExpireDeleteFiles() throws Exception { + sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d") + ); + spark.createDataset(records, Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(tableName).append(); + sql("DELETE FROM %s WHERE id=1", tableName); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + Assert.assertEquals("Should have 1 delete manifest", 1, deleteManifests(table).size()); + Assert.assertEquals("Should have 1 delete file", 1, deleteFiles(table).size()); + Path deleteManifestPath = new Path(deleteManifests(table).iterator().next().path()); + Path deleteFilePath = new Path(String.valueOf(deleteFiles(table).iterator().next().path())); + + sql("CALL %s.system.rewrite_data_files(" + + "table => '%s'," + + "options => map(" + + "'delete-file-threshold','1'," + + "'use-starting-sequence-number', 'false'))", + catalogName, tableIdent); + table.refresh(); + + sql("INSERT INTO TABLE %s VALUES (5, 'e')", tableName); // this txn moves the file to the DELETED state + sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName); // this txn removes the file reference + table.refresh(); + + Assert.assertEquals("Should have no delete manifests", 0, deleteManifests(table).size()); + Assert.assertEquals("Should have no delete files", 0, deleteFiles(table).size()); + + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Assert.assertTrue("Delete manifest should still exist", localFs.exists(deleteManifestPath)); + Assert.assertTrue("Delete file should still exist", localFs.exists(deleteFilePath)); + + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + sql("CALL %s.system.expire_snapshots(" + + "older_than => TIMESTAMP '%s'," + + "table => '%s'," + + "retain_last => 1)", + catalogName, currentTimestamp, tableIdent); + + Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath)); + Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath)); + } + + private List deleteManifests(Table table) { + return table.currentSnapshot().deleteManifests(); + } + + private Set deleteFiles(Table table) { + Set deleteFiles = Sets.newHashSet(); + + for (FileScanTask task : table.newScan().planFiles()) { + deleteFiles.addAll(task.deletes()); + } + + return deleteFiles; + } }