diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a19f91a47bd7..a992ca3f0449 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -27,6 +27,12 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method ThisT org.apache.iceberg.SnapshotUpdate::scanManifestsWith(java.util.concurrent.ExecutorService)" justification: "Accept all changes prior to introducing API compatibility checks" + - code: "java.method.addedToInterface" + new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedEqualityDeleteFilesCount()" + justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" + - code: "java.method.addedToInterface" + new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()" + justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" - code: "java.method.addedToInterface" new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)" justification: "Accept all changes prior to introducing API compatibility checks" diff --git a/api/src/main/java/org/apache/iceberg/ManifestContent.java b/api/src/main/java/org/apache/iceberg/ManifestContent.java index b4b4473ef32c..1c32b9915682 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestContent.java +++ b/api/src/main/java/org/apache/iceberg/ManifestContent.java @@ -35,4 +35,12 @@ public enum ManifestContent { public int id() { return id; } + + public static ManifestContent fromId(int id) { + switch (id) { + case 0: return DATA; + case 1: return DELETES; + } + throw new IllegalArgumentException("Unknown manifest content: " + id); + } } diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 2c7d274cbf63..b614dcb224ec 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -101,6 +101,16 @@ interface Result { */ long deletedDataFilesCount(); + /** + * Returns the number of deleted equality delete files. + */ + long deletedEqualityDeleteFilesCount(); + + /** + * Returns the number of deleted position delete files. + */ + long deletedPositionDeleteFilesCount(); + /** * Returns the number of deleted manifests. */ diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java index 5b10461b1bf4..32c6be5ae103 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java @@ -22,6 +22,8 @@ public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result { private final long deletedDataFilesCount; + private final long deletedPosDeleteFilesCount; + private final long deletedEqDeleteFilesCount; private final long deletedManifestsCount; private final long deletedManifestListsCount; @@ -29,6 +31,20 @@ public BaseExpireSnapshotsActionResult(long deletedDataFilesCount, long deletedManifestsCount, long deletedManifestListsCount) { this.deletedDataFilesCount = deletedDataFilesCount; + this.deletedPosDeleteFilesCount = 0; + this.deletedEqDeleteFilesCount = 0; + this.deletedManifestsCount = deletedManifestsCount; + this.deletedManifestListsCount = deletedManifestListsCount; + } + + public BaseExpireSnapshotsActionResult(long deletedDataFilesCount, + long deletedPosDeleteFilesCount, + long deletedEqDeleteFilesCount, + long deletedManifestsCount, + long deletedManifestListsCount) { + this.deletedDataFilesCount = deletedDataFilesCount; + this.deletedPosDeleteFilesCount = deletedPosDeleteFilesCount; + this.deletedEqDeleteFilesCount = deletedEqDeleteFilesCount; this.deletedManifestsCount = deletedManifestsCount; this.deletedManifestListsCount = deletedManifestListsCount; } @@ -38,6 +54,16 @@ public long deletedDataFilesCount() { return deletedDataFilesCount; } + @Override + public long deletedPositionDeleteFilesCount() { + return deletedPosDeleteFilesCount; + } + + @Override + public long deletedEqualityDeleteFilesCount() { + return deletedEqDeleteFilesCount; + } + @Override public long deletedManifestsCount() { return deletedManifestsCount; diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java index 0f6d0f6fe136..a77dafe5af05 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java @@ -65,7 +65,7 @@ public void testExpireSnapshotsInEmptyTable() { List output = sql( "CALL %s.system.expire_snapshots('%s')", catalogName, tableIdent); - assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L)), output); + assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L)), output); } @Test @@ -92,7 +92,7 @@ public void testExpireSnapshotsUsingPositionalArgs() { "CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')", catalogName, tableIdent, secondSnapshotTimestamp); assertEquals("Procedure output must match", - ImmutableList.of(row(0L, 0L, 1L)), + ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output1); table.refresh(); @@ -118,7 +118,7 @@ public void testExpireSnapshotsUsingPositionalArgs() { "CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)", catalogName, tableIdent, currentTimestamp); assertEquals("Procedure output must match", - ImmutableList.of(row(2L, 2L, 1L)), + ImmutableList.of(row(2L, 0L, 0L, 2L, 1L)), output); } @@ -144,7 +144,7 @@ public void testExpireSnapshotUsingNamedArgs() { "retain_last => 1)", catalogName, currentTimestamp, tableIdent); assertEquals("Procedure output must match", - ImmutableList.of(row(0L, 0L, 1L)), + ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output); } @@ -213,7 +213,8 @@ public void testConcurrentExpireSnapshots() { "max_concurrent_deletes => %s," + "retain_last => 1)", catalogName, currentTimestamp, tableIdent, 4); - assertEquals("Expiring snapshots concurrently should succeed", ImmutableList.of(row(0L, 0L, 3L)), output); + assertEquals("Expiring snapshots concurrently should succeed", + ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)), output); } @Test @@ -274,12 +275,14 @@ public void testExpireDeleteFiles() throws Exception { Assert.assertTrue("Delete file should still exist", localFs.exists(deleteFilePath)); Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); - sql("CALL %s.system.expire_snapshots(" + + List output = sql("CALL %s.system.expire_snapshots(" + "older_than => TIMESTAMP '%s'," + "table => '%s'," + "retain_last => 1)", catalogName, currentTimestamp, tableIdent); + assertEquals("Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)", + ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)), output); Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath)); Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath)); } @@ -306,7 +309,7 @@ public void testExpireSnapshotWithStreamResultsEnabled() { "retain_last => 1, " + "stream_results => true)", catalogName, currentTimestamp, tableIdent); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 1L)), output); + assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output); } @Test @@ -337,7 +340,7 @@ public void testExpireSnapshotsProcedureWorksWithSqlComments() { List output = sql( callStatement, catalogName, currentTimestamp, tableIdent); assertEquals("Procedure output must match", - ImmutableList.of(row(0L, 0L, 1L)), + ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output); table.refresh(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java index 310081d18edb..2216383337ef 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.apache.iceberg.FileContent; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -215,7 +216,7 @@ private ExpireSnapshots.Result doExecute() { private Dataset buildValidFileDF(TableMetadata metadata) { Table staticTable = newStaticTable(metadata, table.io()); - return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE) + return buildValidContentFileWithTypeDF(staticTable) .union(withFileType(buildManifestFileDF(staticTable), MANIFEST)) .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST)); } @@ -228,6 +229,8 @@ private Dataset buildValidFileDF(TableMetadata metadata) { */ private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { AtomicLong dataFileCount = new AtomicLong(0L); + AtomicLong posDeleteFileCount = new AtomicLong(0L); + AtomicLong eqDeleteFileCount = new AtomicLong(0L); AtomicLong manifestCount = new AtomicLong(0L); AtomicLong manifestListCount = new AtomicLong(0L); @@ -243,23 +246,31 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { String file = fileInfo.getString(0); String type = fileInfo.getString(1); deleteFunc.accept(file); - switch (type) { - case CONTENT_FILE: - dataFileCount.incrementAndGet(); - LOG.trace("Deleted Content File: {}", file); - break; - case MANIFEST: - manifestCount.incrementAndGet(); - LOG.debug("Deleted Manifest: {}", file); - break; - case MANIFEST_LIST: - manifestListCount.incrementAndGet(); - LOG.debug("Deleted Manifest List: {}", file); - break; + + if (FileContent.DATA.name().equalsIgnoreCase(type)) { + dataFileCount.incrementAndGet(); + LOG.trace("Deleted Data File: {}", file); + } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) { + posDeleteFileCount.incrementAndGet(); + LOG.trace("Deleted Positional Delete File: {}", file); + } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) { + eqDeleteFileCount.incrementAndGet(); + LOG.trace("Deleted Equality Delete File: {}", file); + } else if (MANIFEST.equals(type)) { + manifestCount.incrementAndGet(); + LOG.debug("Deleted Manifest: {}", file); + } else if (MANIFEST_LIST.equalsIgnoreCase(type)) { + manifestListCount.incrementAndGet(); + LOG.debug("Deleted Manifest List: {}", file); + } else { + throw new ValidationException("Illegal file type: %s", type); } }); - LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); - return new BaseExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); + long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + eqDeleteFileCount.get(); + LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() + manifestListCount.get()); + + return new BaseExpireSnapshotsActionResult(dataFileCount.get(), posDeleteFileCount.get(), + eqDeleteFileCount.get(), manifestCount.get(), manifestListCount.get()); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index f8c5e454b0ca..99a026abf5f0 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -25,21 +25,26 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.actions.Action; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.JobGroupUtils; import org.apache.iceberg.spark.SparkTableUtil; -import org.apache.iceberg.spark.SparkUtil; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; @@ -48,6 +53,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import scala.Tuple2; import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; import static org.apache.spark.sql.functions.col; @@ -122,18 +128,29 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { return new BaseTable(ops, metadataFileLocation); } - // builds a DF of delete and data file locations by reading all manifests - protected Dataset buildValidContentFileDF(Table table) { - JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); - Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); + // builds a DF of delete and data file path and type by reading all manifests + protected Dataset buildValidContentFileWithTypeDF(Table table) { + Broadcast tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); Dataset allManifests = loadMetadataTable(table, ALL_MANIFESTS) - .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId") + .selectExpr( + "content", + "path", + "length", + "partition_spec_id as partitionSpecId", + "added_snapshot_id as addedSnapshotId") .dropDuplicates("path") .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks .as(Encoders.bean(ManifestFileBean.class)); - return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH); + return allManifests + .flatMap(new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .toDF(FILE_PATH, FILE_TYPE); + } + + // builds a DF of delete and data file paths by reading all manifests + protected Dataset buildValidContentFileDF(Table table) { + return buildValidContentFileWithTypeDF(table).select(FILE_PATH); } protected Dataset buildManifestFileDF(Table table) { @@ -176,16 +193,39 @@ protected Dataset loadMetadataTable(Table table, MetadataTableType type) { return SparkTableUtil.loadMetadataTable(spark, table, type); } - private static class ReadManifest implements FlatMapFunction { - private final Broadcast io; + private static class ReadManifest implements FlatMapFunction> { + private final Broadcast
table; - ReadManifest(Broadcast io) { - this.io = io; + ReadManifest(Broadcast
table) { + this.table = table; } @Override - public Iterator call(ManifestFileBean manifest) { - return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator()); + public Iterator> call(ManifestFileBean manifest) { + return new ClosingIterator<>(entries(manifest)); + } + + public CloseableIterator> entries(ManifestFileBean manifest) { + FileIO io = table.getValue().io(); + Map specs = table.getValue().specs(); + ImmutableList projection = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name()); + + switch (manifest.content()) { + case DATA: + return CloseableIterator.transform( + ManifestFiles.read(manifest, io, specs).select(projection).iterator(), + ReadManifest::contentFileWithType); + case DELETES: + return CloseableIterator.transform( + ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(), + ReadManifest::contentFileWithType); + default: + throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content()); + } + } + + static Tuple2 contentFileWithType(ContentFile file) { + return new Tuple2<>(file.path().toString(), file.content().toString()); } } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java index 0837fb7d39e4..269130496dc9 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java @@ -29,6 +29,7 @@ public class ManifestFileBean implements ManifestFile { private Long length = null; private Integer partitionSpecId = null; private Long addedSnapshotId = null; + private Integer content = null; public String getPath() { return path; @@ -62,6 +63,14 @@ public void setAddedSnapshotId(Long addedSnapshotId) { this.addedSnapshotId = addedSnapshotId; } + public Integer getContent() { + return content; + } + + public void setContent(Integer content) { + this.content = content; + } + @Override public String path() { return path; @@ -79,7 +88,7 @@ public int partitionSpecId() { @Override public ManifestContent content() { - return ManifestContent.DATA; + return ManifestContent.fromId(content); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 8cf4d275cd9e..042fa6e4bb04 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -52,6 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ new StructField("deleted_data_files_count", DataTypes.LongType, true, Metadata.empty()), + new StructField("deleted_position_delete_files_count", DataTypes.LongType, true, Metadata.empty()), + new StructField("deleted_equality_delete_files_count", DataTypes.LongType, true, Metadata.empty()), new StructField("deleted_manifest_files_count", DataTypes.LongType, true, Metadata.empty()), new StructField("deleted_manifest_lists_count", DataTypes.LongType, true, Metadata.empty()) }); @@ -118,6 +120,8 @@ public InternalRow[] call(InternalRow args) { private InternalRow[] toOutputRows(ExpireSnapshots.Result result) { InternalRow row = newInternalRow( result.deletedDataFilesCount(), + result.deletedPositionDeleteFilesCount(), + result.deletedEqualityDeleteFilesCount(), result.deletedManifestsCount(), result.deletedManifestListsCount() ); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index c4445e95454e..d471c4062b6e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -26,11 +26,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -40,6 +43,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -94,6 +98,20 @@ public class TestExpireSnapshotsAction extends SparkTestBase { .withPartitionPath("c1=3") // easy way to set partition data for now .withRecordCount(1) .build(); + static final DeleteFile FILE_A_POS_DELETES = FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a-pos-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A_EQ_DELETES = FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-a-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("c1=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -122,13 +140,18 @@ private Long rightAfterSnapshot(long snapshotId) { return end; } - private void checkExpirationResults(long expectedDatafiles, long expectedManifestsDeleted, - long expectedManifestListsDeleted, ExpireSnapshots.Result results) { + private void checkExpirationResults(long expectedDatafiles, long expectedPosDeleteFiles, long expectedEqDeleteFiles, + long expectedManifestsDeleted, long expectedManifestListsDeleted, + ExpireSnapshots.Result results) { Assert.assertEquals("Incorrect number of manifest files deleted", expectedManifestsDeleted, results.deletedManifestsCount()); Assert.assertEquals("Incorrect number of datafiles deleted", expectedDatafiles, results.deletedDataFilesCount()); + Assert.assertEquals("Incorrect number of pos deletefiles deleted", + expectedPosDeleteFiles, results.deletedPositionDeleteFilesCount()); + Assert.assertEquals("Incorrect number of eq deletefiles deleted", + expectedEqDeleteFiles, results.deletedEqualityDeleteFilesCount()); Assert.assertEquals("Incorrect number of manifest lists deleted", expectedManifestListsDeleted, results.deletedManifestListsCount()); } @@ -154,7 +177,7 @@ public void testFilesCleaned() throws Exception { Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots())); - checkExpirationResults(1L, 1L, 2L, results); + checkExpirationResults(1L, 0L, 0L, 1L, 2L, results); } @Test @@ -203,7 +226,7 @@ public void dataFilesCleanupWithParallelTasks() throws IOException { Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); - checkExpirationResults(2L, 3L, 3L, result); + checkExpirationResults(2L, 0L, 0L, 3L, 3L, result); } @Test @@ -213,7 +236,7 @@ public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { .commit(); ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(table).execute(); - checkExpirationResults(0L, 0L, 0L, results); + checkExpirationResults(0L, 0L, 0L, 0L, 0L, results); } @Test @@ -236,7 +259,7 @@ public void testCleanupRepeatedOverwrites() throws Exception { long end = rightAfterSnapshot(); ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(table).expireOlderThan(end).execute(); - checkExpirationResults(1L, 39L, 20L, results); + checkExpirationResults(1L, 0L, 0L, 39L, 20L, results); } @Test @@ -297,7 +320,7 @@ public void testExpireTwoSnapshotsById() throws Exception { Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); Assert.assertEquals("Second snapshot should not be present.", null, table.snapshot(secondSnapshotID)); - checkExpirationResults(0L, 0L, 2L, result); + checkExpirationResults(0L, 0L, 0L, 0L, 2L, result); } @Test @@ -323,7 +346,7 @@ public void testRetainLastWithExpireById() { Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); - checkExpirationResults(0L, 0L, 1L, result); + checkExpirationResults(0L, 0L, 0L, 0L, 1L, result); } @Test @@ -349,7 +372,7 @@ public void testRetainLastWithTooFewSnapshots() { Assert.assertEquals("Should have two snapshots", 2, Lists.newArrayList(table.snapshots()).size()); Assert.assertEquals("First snapshot should still present", firstSnapshotId, table.snapshot(firstSnapshotId).snapshotId()); - checkExpirationResults(0L, 0L, 0L, result); + checkExpirationResults(0L, 0L, 0L, 0L, 0L, result); } @Test @@ -380,7 +403,7 @@ public void testRetainLastKeepsExpiringSnapshot() { Assert.assertEquals("Should have three snapshots.", 3, Lists.newArrayList(table.snapshots()).size()); Assert.assertNotNull("Second snapshot should present.", table.snapshot(secondSnapshot.snapshotId())); - checkExpirationResults(0L, 0L, 1L, result); + checkExpirationResults(0L, 0L, 0L, 0L, 1L, result); } @Test @@ -424,7 +447,7 @@ public void testExpireOlderThanMultipleCalls() { Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); - checkExpirationResults(0L, 0L, 2L, result); + checkExpirationResults(0L, 0L, 0L, 0L, 2L, result); } @Test @@ -454,7 +477,7 @@ public void testRetainLastMultipleCalls() { Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); - checkExpirationResults(0L, 0L, 2L, result); + checkExpirationResults(0L, 0L, 0L, 0L, 2L, result); } @Test @@ -492,7 +515,7 @@ public void testScanExpiredManifestInValidSnapshotAppend() { .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); - checkExpirationResults(1L, 1L, 2L, result); + checkExpirationResults(1L, 0L, 0L, 1L, 2L, result); } @Test @@ -526,7 +549,7 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); - checkExpirationResults(1L, 1L, 2L, result); + checkExpirationResults(1L, 0L, 0L, 1L, 2L, result); } /** @@ -564,7 +587,7 @@ public void testWithExpiringDanglingStageCommit() { .expireOlderThan(snapshotB.timestampMillis() + 1) .execute(); - checkExpirationResults(1L, 1L, 2L, result); + checkExpirationResults(1L, 0L, 0L, 1L, 2L, result); Set expectedDeletes = Sets.newHashSet(); expectedDeletes.add(snapshotA.manifestListLocation()); @@ -650,7 +673,7 @@ public void testWithCherryPickTableSnapshot() { }); }); - checkExpirationResults(1L, 2L, 2L, result); + checkExpirationResults(1L, 0L, 0L, 2L, 2L, result); } /** @@ -704,7 +727,7 @@ public void testWithExpiringStagedThenCherrypick() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); - checkExpirationResults(0L, 1L, 1L, firstResult); + checkExpirationResults(0L, 0L, 0L, 1L, 1L, firstResult); // Expire all snapshots including cherry-pick ExpireSnapshots.Result secondResult = SparkActions.get().expireSnapshots(table) @@ -718,7 +741,7 @@ public void testWithExpiringStagedThenCherrypick() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); - checkExpirationResults(0L, 0L, 2L, secondResult); + checkExpirationResults(0L, 0L, 0L, 0L, 2L, secondResult); } @Test @@ -751,7 +774,7 @@ public void testExpireOlderThan() { Assert.assertEquals("Should remove only the expired manifest list location", Sets.newHashSet(firstSnapshot.manifestListLocation()), deletedFiles); - checkExpirationResults(0, 0, 1, result); + checkExpirationResults(0, 0, 0, 0, 1, result); } @Test @@ -804,7 +827,7 @@ public void testExpireOlderThanWithDelete() { FILE_A.path()), // deleted deletedFiles); - checkExpirationResults(1, 2, 2, result); + checkExpirationResults(1, 0, 0, 2, 2, result); } @Test @@ -862,7 +885,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { FILE_A.path()), // deleted deletedFiles); - checkExpirationResults(1, 1, 2, result); + checkExpirationResults(1, 0, 0, 1, 2, result); } @Test @@ -917,7 +940,7 @@ public void testExpireOlderThanWithRollback() { Iterables.getOnlyElement(secondSnapshotManifests).path()), // manifest is no longer referenced deletedFiles); - checkExpirationResults(0, 1, 1, result); + checkExpirationResults(0, 0, 0, 1, 1, result); } @Test @@ -967,7 +990,77 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { FILE_B.path()), // added, but rolled back deletedFiles); - checkExpirationResults(1, 1, 1, result); + checkExpirationResults(1, 0, 0, 1, 1, result); + } + + @Test + public void testExpireOlderThanWithDeleteFile() { + table.updateProperties() + .set(TableProperties.FORMAT_VERSION, "2") + .set(TableProperties.MANIFEST_MERGE_ENABLED, "false") + .commit(); + + // Add Data File + table.newAppend() + .appendFile(FILE_A) + .commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + + // Add POS Delete + table.newRowDelta() + .addDeletes(FILE_A_POS_DELETES) + .commit(); + Snapshot secondSnapshot = table.currentSnapshot(); + + // Add EQ Delete + table.newRowDelta() + .addDeletes(FILE_A_EQ_DELETES) + .commit(); + Snapshot thirdSnapshot = table.currentSnapshot(); + + // Move files to DELETED + table.newDelete() + .deleteFromRowFilter(Expressions.alwaysTrue()) + .commit(); + Snapshot fourthSnapshot = table.currentSnapshot(); + + long afterAllDeleted = rightAfterSnapshot(); + + table.newAppend() + .appendFile(FILE_B) + .commit(); + + Set deletedFiles = Sets.newHashSet(); + + ExpireSnapshots.Result result = SparkActions.get().expireSnapshots(table) + .expireOlderThan(afterAllDeleted) + .deleteWith(deletedFiles::add) + .execute(); + + Set expectedDeletes = Sets.newHashSet( + firstSnapshot.manifestListLocation(), + secondSnapshot.manifestListLocation(), + thirdSnapshot.manifestListLocation(), + fourthSnapshot.manifestListLocation(), + FILE_A.path().toString(), + FILE_A_POS_DELETES.path().toString(), + FILE_A_EQ_DELETES.path().toString()); + + expectedDeletes.addAll( + thirdSnapshot.allManifests().stream() + .map(ManifestFile::path) + .map(CharSequence::toString).collect(Collectors.toSet())); + // Delete operation (fourth snapshot) generates new manifest files + expectedDeletes.addAll( + fourthSnapshot.allManifests().stream() + .map(ManifestFile::path) + .map(CharSequence::toString).collect(Collectors.toSet())); + + Assert.assertEquals("Should remove expired manifest lists and deleted data file", + expectedDeletes, + deletedFiles); + + checkExpirationResults(1, 1, 1, 6, 4, result); } @Test @@ -980,7 +1073,7 @@ public void testExpireOnEmptyTable() { .deleteWith(deletedFiles::add) .execute(); - checkExpirationResults(0, 0, 0, result); + checkExpirationResults(0, 0, 0, 0, 0, result); } @Test @@ -1051,7 +1144,7 @@ public void testUseLocalIterator() { int jobsAfterStreamResults = spark.sparkContext().dagScheduler().nextJobId().get(); int jobsRunDuringStreamResults = jobsAfterStreamResults - jobsBeforeStreamResults; - checkExpirationResults(1L, 1L, 2L, results); + checkExpirationResults(1L, 0L, 0L, 1L, 2L, results); Assert.assertEquals("Expected total number of jobs with stream-results should match the expected number", 5L, jobsRunDuringStreamResults);