diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index a08dcf8a2101..468227e12ba2 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -101,6 +101,11 @@ interface Result { */ long deletedDataFilesCount(); + /** + * Return the number that deleted delete files. + */ + long deletedDeleteFilesCount(); + /** * Returns the number of deleted manifests. */ diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index dde78493e7c4..872d932c317d 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -53,8 +53,9 @@ public class AllManifestsTable extends BaseMetadataTable { Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()), Types.NestedField.optional(12, "lower_bound", Types.StringType.get()), Types.NestedField.optional(13, "upper_bound", Types.StringType.get()) - ))) - ); + ))), + Types.NestedField.optional(14, "content", Types.IntegerType.get()) + ); AllManifestsTable(TableOperations ops, Table table) { this(ops, table, table.name() + ".all_manifests"); diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 9dbf50e762c7..8911adbd06d5 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -56,6 +56,12 @@ public static CloseableIterable readPaths(ManifestFile manifest, FileIO entry -> entry.file().path().toString()); } + public static CloseableIterable readDeleteFiles(ManifestFile manifestFile, FileIO io) { + return CloseableIterable.transform( + readDeleteManifest(manifestFile, io, null).select(ImmutableList.of("file_path")).liveEntries(), + entry -> entry.file().path().toString()); + } + /** * Returns a new {@link ManifestReader} for a {@link ManifestFile}. *

diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index e7b92221646e..c030c28ef277 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -41,8 +41,9 @@ public class ManifestsTable extends BaseMetadataTable { Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()), Types.NestedField.optional(12, "lower_bound", Types.StringType.get()), Types.NestedField.optional(13, "upper_bound", Types.StringType.get()) - ))) - ); + ))), + Types.NestedField.required(14, "content", Types.IntegerType.get()) + ); private final PartitionSpec spec; @@ -94,8 +95,9 @@ static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile man manifest.addedFilesCount(), manifest.existingFilesCount(), manifest.deletedFilesCount(), - partitionSummariesToRows(spec, manifest.partitions()) - ); + partitionSummariesToRows(spec, manifest.partitions()), + manifest.content().id() + ); } static List partitionSummariesToRows(PartitionSpec spec, diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java index 5b10461b1bf4..0d75887cfcd5 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java @@ -22,13 +22,16 @@ public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result { private final long deletedDataFilesCount; + private final long deletedDeleteFilesCount; private final long deletedManifestsCount; private final long deletedManifestListsCount; public BaseExpireSnapshotsActionResult(long deletedDataFilesCount, + long deletedDeleteFilesCount, long deletedManifestsCount, long deletedManifestListsCount) { this.deletedDataFilesCount = deletedDataFilesCount; + this.deletedDeleteFilesCount = deletedDeleteFilesCount; this.deletedManifestsCount = deletedManifestsCount; this.deletedManifestListsCount = deletedManifestListsCount; } @@ -38,6 +41,11 @@ public long deletedDataFilesCount() { return deletedDataFilesCount; } + @Override + public long deletedDeleteFilesCount() { + return deletedDeleteFilesCount; + } + @Override public long deletedManifestsCount() { return deletedManifestsCount; diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java index 8473e5042ecb..65105d74c95b 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java @@ -23,18 +23,22 @@ public class ExpireSnapshotsActionResult { private final Long dataFilesDeleted; + private final Long deleteFilesDeleted; private final Long manifestFilesDeleted; private final Long manifestListsDeleted; static ExpireSnapshotsActionResult wrap(ExpireSnapshots.Result result) { return new ExpireSnapshotsActionResult( result.deletedDataFilesCount(), + result.deletedDeleteFilesCount(), result.deletedManifestsCount(), result.deletedManifestListsCount()); } - public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) { + public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long deleteFilesDeleted, + Long manifestFilesDeleted, Long manifestListsDeleted) { this.dataFilesDeleted = dataFilesDeleted; + this.deleteFilesDeleted = deleteFilesDeleted; this.manifestFilesDeleted = manifestFilesDeleted; this.manifestListsDeleted = manifestListsDeleted; } @@ -43,6 +47,10 @@ public Long dataFilesDeleted() { return dataFilesDeleted; } + public Long deleteFiledDeleted() { + return deleteFilesDeleted; + } + public Long manifestFilesDeleted() { return manifestFilesDeleted; } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java b/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java index f51499dcf1aa..257de88d5692 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java @@ -27,6 +27,7 @@ public class ManifestFileBean implements ManifestFile { private String path = null; private Long length = null; private Integer partitionSpecId = null; + private Integer content = null; private Long addedSnapshotId = null; public String getPath() { @@ -61,6 +62,14 @@ public void setAddedSnapshotId(Long addedSnapshotId) { this.addedSnapshotId = addedSnapshotId; } + public Integer getContent() { + return content; + } + + public void setContent(Integer content) { + this.content = content; + } + @Override public String path() { return path; @@ -78,7 +87,8 @@ public int partitionSpecId() { @Override public ManifestContent content() { - return ManifestContent.DATA; + return (content != null && content == ManifestContent.DELETES.id()) ? + ManifestContent.DELETES : ManifestContent.DATA; } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java index 6cbeba119b29..fa0c2c35071b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -68,6 +68,7 @@ public class BaseExpireSnapshotsSparkAction private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class); private static final String DATA_FILE = "Data File"; + private static final String DELETE_FILE = "Delete File"; private static final String MANIFEST = "Manifest"; private static final String MANIFEST_LIST = "Manifest List"; @@ -201,8 +202,9 @@ private Dataset appendTypeString(Dataset ds, String type) { private Dataset buildValidFileDF(TableMetadata metadata) { Table staticTable = newStaticTable(metadata, this.table.io()); return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE) + .union(appendTypeString(buildValidDeleteFileDF(staticTable), DELETE_FILE) .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST)) - .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)); + .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST))); } /** @@ -213,6 +215,7 @@ private Dataset buildValidFileDF(TableMetadata metadata) { */ private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { AtomicLong dataFileCount = new AtomicLong(0L); + AtomicLong deleteFileCount = new AtomicLong(0L); AtomicLong manifestCount = new AtomicLong(0L); AtomicLong manifestListCount = new AtomicLong(0L); @@ -231,7 +234,11 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { switch (type) { case DATA_FILE: dataFileCount.incrementAndGet(); - LOG.trace("Deleted Data File: {}", file); + LOG.info("Deleted Data File: {}", file); + break; + case DELETE_FILE: + deleteFileCount.incrementAndGet(); + LOG.info("Deleted Delete File:{}", file); break; case MANIFEST: manifestCount.incrementAndGet(); @@ -244,7 +251,9 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { } }); - LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); - return new BaseExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); + LOG.info("Deleted {} total files", dataFileCount.get() + deleteFileCount.get() + + manifestCount.get() + manifestListCount.get()); + return new BaseExpireSnapshotsActionResult(dataFileCount.get(), deleteFileCount.get(), + manifestCount.get(), manifestListCount.get()); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java index 073be71bc794..590e771d4043 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java @@ -146,8 +146,9 @@ public RemoveOrphanFiles.Result execute() { private RemoveOrphanFiles.Result doExecute() { Dataset validDataFileDF = buildValidDataFileDF(table); + Dataset validDeleteFileDF = buildValidDeleteFileDF(table); Dataset validMetadataFileDF = buildValidMetadataFileDF(table, ops); - Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); + Dataset validFileDF = validDataFileDF.union(validDeleteFileDF).union(validMetadataFileDF); Dataset actualFileDF = buildActualFileDF(); Column actualFileName = filenameUDF.apply(actualFileDF.col("file_path")); diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index e760a6de4236..241dad808462 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Snapshot; @@ -45,6 +46,7 @@ import org.apache.iceberg.spark.SparkUtil; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.DataFrameReader; @@ -152,14 +154,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { protected Dataset buildValidDataFileDF(Table table) { JavaSparkContext context = new JavaSparkContext(spark.sparkContext()); Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); + return loadAllManifestFileBean(table).filter((FilterFunction) manifest -> + manifest.content() == ManifestContent.DATA) + .flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path"); + } - Dataset allManifests = loadMetadataTable(table, ALL_MANIFESTS) - .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId") - .dropDuplicates("path") - .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks - .as(Encoders.bean(ManifestFileBean.class)); - - return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path"); + protected Dataset buildValidDeleteFileDF(Table table) { + JavaSparkContext context = new JavaSparkContext(spark.sparkContext()); + Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); + return loadAllManifestFileBean(table).filter((FilterFunction) manifest -> + manifest.content() == ManifestContent.DELETES) + .flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path"); } protected Dataset buildManifestFileDF(Table table) { @@ -190,6 +195,15 @@ protected Dataset buildValidMetadataFileDF(Table table, TableOperations ops .orNoop() .build(); + private Dataset loadAllManifestFileBean(Table table) { + return loadMetadataTable(table, ALL_MANIFESTS) + .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "content", + "added_snapshot_id as addedSnapshotId") + .dropDuplicates("path") + .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks + .as(Encoders.bean(ManifestFileBean.class)); + } + private Dataset loadCatalogMetadataTable(String tableName, MetadataTableType type) { Preconditions.checkArgument(!LOAD_CATALOG.isNoop(), "Cannot find Spark3Util class but Spark3 is in use"); return LOAD_CATALOG.asStatic().invoke(spark, tableName, type); @@ -235,7 +249,13 @@ private static class ReadManifest implements FlatMapFunction call(ManifestFileBean manifest) { - return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator()); + switch (manifest.content()) { + case DATA: + return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator()); + case DELETES: + return new ClosingIterator<>(ManifestFiles.readDeleteFiles(manifest, io.getValue()).iterator()); + } + throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content()); } } } diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java index 5729a04a4201..a280743fbda8 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java @@ -122,11 +122,13 @@ private Long rightAfterSnapshot(long snapshotId) { return end; } - private void checkExpirationResults(long expectedDatafiles, long expectedManifestsDeleted, + private void checkExpirationResults(long expectedDatafiles, long expectedDeletefiles, long expectedManifestsDeleted, long expectedManifestListsDeleted, ExpireSnapshotsActionResult results) { Assert.assertEquals("Incorrect number of manifest files deleted", (Long) expectedManifestsDeleted, results.manifestFilesDeleted()); + Assert.assertEquals("Incorrect number of deletefiles deleted", + (Long) expectedDeletefiles, results.deleteFiledDeleted()); Assert.assertEquals("Incorrect number of datafiles deleted", (Long) expectedDatafiles, results.dataFilesDeleted()); Assert.assertEquals("Incorrect number of manifest lists deleted", @@ -154,7 +156,7 @@ public void testFilesCleaned() throws Exception { Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots())); - checkExpirationResults(1L, 1L, 2L, results); + checkExpirationResults(1L, 0L, 1L, 2L, results); } @Test @@ -203,7 +205,7 @@ public void dataFilesCleanupWithParallelTasks() throws IOException { Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); - checkExpirationResults(2L, 3L, 3L, result); + checkExpirationResults(2L, 0L, 3L, 3L, result); } @Test @@ -213,7 +215,7 @@ public void testNoFilesDeletedWhenNoSnapshotsExpired() throws Exception { .commit(); ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().execute(); - checkExpirationResults(0L, 0L, 0L, results); + checkExpirationResults(0L, 0L, 0L, 0L, results); } @Test @@ -236,7 +238,7 @@ public void testCleanupRepeatedOverwrites() throws Exception { long end = rightAfterSnapshot(); ExpireSnapshotsActionResult results = Actions.forTable(table).expireSnapshots().expireOlderThan(end).execute(); - checkExpirationResults(1L, 39L, 20L, results); + checkExpirationResults(1L, 0L, 39L, 20L, results); } @Test @@ -297,7 +299,7 @@ public void testExpireTwoSnapshotsById() throws Exception { Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); Assert.assertEquals("Second snapshot should not be present.", null, table.snapshot(secondSnapshotID)); - checkExpirationResults(0L, 0L, 2L, result); + checkExpirationResults(0L, 0L, 0L, 2L, result); } @Test @@ -323,7 +325,7 @@ public void testRetainLastWithExpireById() { Assert.assertEquals("Should have two snapshots.", 2, Lists.newArrayList(table.snapshots()).size()); Assert.assertEquals("First snapshot should not present.", null, table.snapshot(firstSnapshotId)); - checkExpirationResults(0L, 0L, 1L, result); + checkExpirationResults(0L, 0L, 0L, 1L, result); } @Test @@ -349,7 +351,7 @@ public void testRetainLastWithTooFewSnapshots() { Assert.assertEquals("Should have two snapshots", 2, Lists.newArrayList(table.snapshots()).size()); Assert.assertEquals("First snapshot should still present", firstSnapshotId, table.snapshot(firstSnapshotId).snapshotId()); - checkExpirationResults(0L, 0L, 0L, result); + checkExpirationResults(0L, 0L, 0L, 0L, result); } @Test @@ -380,7 +382,7 @@ public void testRetainLastKeepsExpiringSnapshot() { Assert.assertEquals("Should have three snapshots.", 3, Lists.newArrayList(table.snapshots()).size()); Assert.assertNotNull("Second snapshot should present.", table.snapshot(secondSnapshot.snapshotId())); - checkExpirationResults(0L, 0L, 1L, result); + checkExpirationResults(0L, 0L, 0L, 1L, result); } @Test @@ -425,7 +427,7 @@ public void testExpireOlderThanMultipleCalls() { Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); - checkExpirationResults(0L, 0L, 2L, result); + checkExpirationResults(0L, 0L, 0L, 2L, result); } @Test @@ -455,7 +457,7 @@ public void testRetainLastMultipleCalls() { Assert.assertEquals("Should have one snapshots.", 1, Lists.newArrayList(table.snapshots()).size()); Assert.assertNull("Second snapshot should not present.", table.snapshot(secondSnapshot.snapshotId())); - checkExpirationResults(0L, 0L, 2L, result); + checkExpirationResults(0L, 0L, 0L, 2L, result); } @Test @@ -493,7 +495,7 @@ public void testScanExpiredManifestInValidSnapshotAppend() { .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); - checkExpirationResults(1L, 1L, 2L, result); + checkExpirationResults(1L, 0L, 1L, 2L, result); } @Test @@ -527,7 +529,7 @@ public void testScanExpiredManifestInValidSnapshotFastAppend() { .execute(); Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); - checkExpirationResults(1L, 1L, 2L, result); + checkExpirationResults(1L, 0L, 1L, 2L, result); } /** @@ -565,7 +567,7 @@ public void testWithExpiringDanglingStageCommit() { .expireOlderThan(snapshotB.timestampMillis() + 1) .execute(); - checkExpirationResults(1L, 1L, 2L, result); + checkExpirationResults(1L, 0L, 1L, 2L, result); Set expectedDeletes = new HashSet<>(); expectedDeletes.add(snapshotA.manifestListLocation()); @@ -651,7 +653,7 @@ public void testWithCherryPickTableSnapshot() { }); }); - checkExpirationResults(1L, 2L, 2L, result); + checkExpirationResults(1L, 0L, 2L, 2L, result); } /** @@ -705,7 +707,7 @@ public void testWithExpiringStagedThenCherrypick() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); - checkExpirationResults(0L, 1L, 1L, firstResult); + checkExpirationResults(0L, 0L, 1L, 1L, firstResult); // Expire all snapshots including cherry-pick ExpireSnapshotsActionResult secondResult = Actions.forTable(table).expireSnapshots() @@ -719,7 +721,7 @@ public void testWithExpiringStagedThenCherrypick() { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); - checkExpirationResults(0L, 0L, 2L, secondResult); + checkExpirationResults(0L, 0L, 0L, 2L, secondResult); } @Test @@ -752,7 +754,7 @@ public void testExpireOlderThan() { Assert.assertEquals("Should remove only the expired manifest list location", Sets.newHashSet(firstSnapshot.manifestListLocation()), deletedFiles); - checkExpirationResults(0, 0, 1, result); + checkExpirationResults(0, 0L, 0, 1, result); } @Test @@ -805,7 +807,7 @@ public void testExpireOlderThanWithDelete() { FILE_A.path()), // deleted deletedFiles); - checkExpirationResults(1, 2, 2, result); + checkExpirationResults(1, 0L, 2, 2, result); } @Test @@ -863,7 +865,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { FILE_A.path()), // deleted deletedFiles); - checkExpirationResults(1, 1, 2, result); + checkExpirationResults(1, 0L, 1, 2, result); } @Test @@ -918,7 +920,7 @@ public void testExpireOlderThanWithRollback() { Iterables.getOnlyElement(secondSnapshotManifests).path()), // manifest is no longer referenced deletedFiles); - checkExpirationResults(0, 1, 1, result); + checkExpirationResults(0, 0L, 1, 1, result); } @Test @@ -968,7 +970,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { FILE_B.path()), // added, but rolled back deletedFiles); - checkExpirationResults(1, 1, 1, result); + checkExpirationResults(1, 0L, 1, 1, result); } @Test @@ -981,7 +983,7 @@ public void testExpireOnEmptyTable() { .deleteWith(deletedFiles::add) .execute(); - checkExpirationResults(0, 0, 0, result); + checkExpirationResults(0, 0L, 0, 0, result); } @Test @@ -1053,7 +1055,7 @@ public void testUseLocalIterator() { int jobsAfter = spark.sparkContext().dagScheduler().nextJobId().get(); int totalJobsRun = jobsAfter - jobsBefore; - checkExpirationResults(1L, 1L, 2L, results); + checkExpirationResults(1L, 0L, 1L, 2L, results); Assert.assertTrue( String.format("Expected more than %d jobs when using local iterator, ran %d", SHUFFLE_PARTITIONS, totalJobsRun), diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index b3dcd1ea55f6..a1c1de77d0ed 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -717,6 +717,7 @@ public void testManifestsTable() { .set("upper_bound", "1") .build() )) + .set("content", manifest.content().id()) .build() ); @@ -772,6 +773,7 @@ public void testAllManifestsTable() { .set("upper_bound", "1") .build() )) + .set("content", manifest.content().id()) .build() )); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index ba1e6b087750..34c02e651c3e 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -100,6 +100,7 @@ public InternalRow[] call(InternalRow args) { private InternalRow[] toOutputRows(ExpireSnapshots.Result result) { InternalRow row = newInternalRow( result.deletedDataFilesCount(), + result.deletedDeleteFilesCount(), result.deletedManifestsCount(), result.deletedManifestListsCount() );