diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 395efd06094c..3f4b7e9b25ea 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -50,7 +50,9 @@
*
This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
- private static final int REF_SNAPSHOT_ID = 18;
+ public static final Types.NestedField REF_SNAPSHOT_ID =
+ Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get());
+
private static final Schema MANIFEST_FILE_SCHEMA =
new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
@@ -74,8 +76,7 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())))),
- Types.NestedField.required(
- REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get()));
+ REF_SNAPSHOT_ID);
AllManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_manifests");
@@ -424,7 +425,7 @@ private Boolean compareSnapshotRef(
}
private boolean isSnapshotRef(BoundReference ref) {
- return ref.fieldId() == REF_SNAPSHOT_ID;
+ return ref.fieldId() == REF_SNAPSHOT_ID.fieldId();
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index fdba8e295752..4a7064fda5ee 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -100,10 +101,25 @@ private static TableMetadata findFirstExistentPreviousMetadata(
* Returns locations of manifest lists in a table.
*
* @param table table for which manifestList needs to be fetched
- * @return the location of manifest Lists
+ * @return the location of manifest lists
*/
public static List manifestListLocations(Table table) {
+ return manifestListLocations(table, null);
+ }
+
+ /**
+ * Returns locations of manifest lists in a table.
+ *
+ * @param table table for which manifestList needs to be fetched
+ * @param snapshotIds ids of snapshots for which manifest lists will be returned
+ * @return the location of manifest lists
+ */
+ public static List manifestListLocations(Table table, Set snapshotIds) {
Iterable snapshots = table.snapshots();
+ if (snapshotIds != null) {
+ snapshots = Iterables.filter(snapshots, s -> snapshotIds.contains(s.snapshotId()));
+ }
+
List manifestListLocations = Lists.newArrayList();
for (Snapshot snapshot : snapshots) {
String manifestListLocation = snapshot.manifestListLocation();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 74a0e70738a7..cdd80040fa9e 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -61,6 +63,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -137,14 +140,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}
- // builds a DF of delete and data file path and type by reading all manifests
protected Dataset contentFileDS(Table table) {
+ return contentFileDS(table, null);
+ }
+
+ protected Dataset contentFileDS(Table table, Set snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast tableBroadcast = sparkContext.broadcast(serializableTable);
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
- Dataset allManifests =
- loadMetadataTable(table, ALL_MANIFESTS)
+ Dataset manifestBeanDS =
+ manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
@@ -155,17 +161,35 @@ protected Dataset contentFileDS(Table table) {
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
- return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
+ return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}
protected Dataset manifestDS(Table table) {
- return loadMetadataTable(table, ALL_MANIFESTS)
+ return manifestDS(table, null);
+ }
+
+ protected Dataset manifestDS(Table table, Set snapshotIds) {
+ return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}
+ private Dataset manifestDF(Table table, Set snapshotIds) {
+ Dataset manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
+ if (snapshotIds != null) {
+ Column filterCond = col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
+ return manifestDF.filter(filterCond);
+ } else {
+ return manifestDF;
+ }
+ }
+
protected Dataset manifestListDS(Table table) {
- List manifestLists = ReachableFileUtil.manifestListLocations(table);
+ return manifestListDS(table, null);
+ }
+
+ protected Dataset manifestListDS(Table table, Set snapshotIds) {
+ List manifestLists = ReachableFileUtil.manifestListLocations(table, snapshotIds);
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index b47f367ddebe..d9af48c221f1 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -26,7 +26,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -165,7 +167,7 @@ public Dataset expire() {
public Dataset expireFiles() {
if (expiredFileDS == null) {
// fetch metadata before expiration
- Dataset originalFileDS = validFileDS(ops.current());
+ TableMetadata originalMetadata = ops.current();
// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots();
@@ -184,11 +186,16 @@ public Dataset expireFiles() {
expireSnapshots.cleanExpiredFiles(false).commit();
- // fetch metadata after expiration
- Dataset validFileDS = validFileDS(ops.refresh());
+ // fetch valid files after expiration
+ TableMetadata updatedMetadata = ops.refresh();
+ Dataset validFileDS = fileDS(updatedMetadata);
+
+ // fetch files referenced by expired snapshots
+ Set deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
+ Dataset deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds);
// determine expired files
- this.expiredFileDS = originalFileDS.except(validFileDS);
+ this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
}
return expiredFileDS;
@@ -236,11 +243,25 @@ private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}
- private Dataset validFileDS(TableMetadata metadata) {
+ private Dataset fileDS(TableMetadata metadata) {
+ return fileDS(metadata, null);
+ }
+
+ private Dataset fileDS(TableMetadata metadata, Set snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
- return contentFileDS(staticTable)
- .union(manifestDS(staticTable))
- .union(manifestListDS(staticTable));
+ return contentFileDS(staticTable, snapshotIds)
+ .union(manifestDS(staticTable, snapshotIds))
+ .union(manifestListDS(staticTable, snapshotIds));
+ }
+
+ private Set findExpiredSnapshotIds(
+ TableMetadata originalMetadata, TableMetadata updatedMetadata) {
+ Set retainedSnapshots =
+ updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return originalMetadata.snapshots().stream()
+ .map(Snapshot::snapshotId)
+ .filter(id -> !retainedSnapshots.contains(id))
+ .collect(Collectors.toSet());
}
private ExpireSnapshots.Result deleteFiles(Iterator files) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index b2421abfb92b..8f10538f6305 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -37,6 +37,7 @@
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -1250,4 +1252,104 @@ public void testExpireAfterExecute() {
List untypedExpiredFiles = action.expire().collectAsList();
Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
}
+
+ @Test
+ public void testExpireFileDeletionMostExpired() {
+ testExpireFilesAreDeleted(5, 2);
+ }
+
+ @Test
+ public void testExpireFileDeletionMostRetained() {
+ testExpireFilesAreDeleted(2, 5);
+ }
+
+ public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) {
+ // Add data files to be expired
+ Set dataFiles = Sets.newHashSet();
+ for (int i = 0; i < dataFilesExpired; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-expired-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ dataFiles.add(df.path().toString());
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ // Delete them all, these will be deleted on expire snapshot
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+ // Clears "DELETED" manifests
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+ Set manifestsBefore = TestHelpers.reachableManifestPaths(table);
+
+ // Add data files to be retained, which are not deleted.
+ for (int i = 0; i < dataFilesRetained; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-retained-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ long end = rightAfterSnapshot();
+
+ Set expectedDeletes = Sets.newHashSet();
+ expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
+ // all snapshot manifest lists except current will be deleted
+ expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
+ expectedDeletes.addAll(
+ manifestsBefore); // new manifests are reachable from current snapshot and not deleted
+ expectedDeletes.addAll(
+ dataFiles); // new data files are reachable from current snapshot and not deleted
+
+ Set deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(end)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ Assert.assertEquals(
+ "All reachable files before expiration should be deleted", expectedDeletes, deletedFiles);
+ }
+
+ @Test
+ public void testExpireSomeCheckFilesDeleted() {
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newAppend().appendFile(FILE_C).commit();
+
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+ waitUntilAfter(after);
+
+ table.newAppend().appendFile(FILE_D).commit();
+
+ table.newDelete().deleteFile(FILE_B).commit();
+
+ Set deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ // C, D should be retained (live)
+ // B should be retained (previous snapshot points to it)
+ // A should be deleted
+ Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
+ }
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 69b14eead4d5..e5ad0ca2139a 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -37,6 +37,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
@@ -808,4 +810,11 @@ public static Set deleteFiles(Table table) {
return deleteFiles;
}
+
+ public static Set reachableManifestPaths(Table table) {
+ return StreamSupport.stream(table.snapshots().spliterator(), false)
+ .flatMap(s -> s.allManifests(table.io()).stream())
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ }
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 74a0e70738a7..cdd80040fa9e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -61,6 +63,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -137,14 +140,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}
- // builds a DF of delete and data file path and type by reading all manifests
protected Dataset contentFileDS(Table table) {
+ return contentFileDS(table, null);
+ }
+
+ protected Dataset contentFileDS(Table table, Set snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast tableBroadcast = sparkContext.broadcast(serializableTable);
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
- Dataset allManifests =
- loadMetadataTable(table, ALL_MANIFESTS)
+ Dataset manifestBeanDS =
+ manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
@@ -155,17 +161,35 @@ protected Dataset contentFileDS(Table table) {
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
- return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
+ return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}
protected Dataset manifestDS(Table table) {
- return loadMetadataTable(table, ALL_MANIFESTS)
+ return manifestDS(table, null);
+ }
+
+ protected Dataset manifestDS(Table table, Set snapshotIds) {
+ return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}
+ private Dataset manifestDF(Table table, Set snapshotIds) {
+ Dataset manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
+ if (snapshotIds != null) {
+ Column filterCond = col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
+ return manifestDF.filter(filterCond);
+ } else {
+ return manifestDF;
+ }
+ }
+
protected Dataset manifestListDS(Table table) {
- List manifestLists = ReachableFileUtil.manifestListLocations(table);
+ return manifestListDS(table, null);
+ }
+
+ protected Dataset manifestListDS(Table table, Set snapshotIds) {
+ List manifestLists = ReachableFileUtil.manifestListLocations(table, snapshotIds);
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index b47f367ddebe..d9af48c221f1 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -26,7 +26,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -165,7 +167,7 @@ public Dataset expire() {
public Dataset expireFiles() {
if (expiredFileDS == null) {
// fetch metadata before expiration
- Dataset originalFileDS = validFileDS(ops.current());
+ TableMetadata originalMetadata = ops.current();
// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots();
@@ -184,11 +186,16 @@ public Dataset expireFiles() {
expireSnapshots.cleanExpiredFiles(false).commit();
- // fetch metadata after expiration
- Dataset validFileDS = validFileDS(ops.refresh());
+ // fetch valid files after expiration
+ TableMetadata updatedMetadata = ops.refresh();
+ Dataset validFileDS = fileDS(updatedMetadata);
+
+ // fetch files referenced by expired snapshots
+ Set deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
+ Dataset deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds);
// determine expired files
- this.expiredFileDS = originalFileDS.except(validFileDS);
+ this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
}
return expiredFileDS;
@@ -236,11 +243,25 @@ private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}
- private Dataset validFileDS(TableMetadata metadata) {
+ private Dataset fileDS(TableMetadata metadata) {
+ return fileDS(metadata, null);
+ }
+
+ private Dataset fileDS(TableMetadata metadata, Set snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
- return contentFileDS(staticTable)
- .union(manifestDS(staticTable))
- .union(manifestListDS(staticTable));
+ return contentFileDS(staticTable, snapshotIds)
+ .union(manifestDS(staticTable, snapshotIds))
+ .union(manifestListDS(staticTable, snapshotIds));
+ }
+
+ private Set findExpiredSnapshotIds(
+ TableMetadata originalMetadata, TableMetadata updatedMetadata) {
+ Set retainedSnapshots =
+ updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return originalMetadata.snapshots().stream()
+ .map(Snapshot::snapshotId)
+ .filter(id -> !retainedSnapshots.contains(id))
+ .collect(Collectors.toSet());
}
private ExpireSnapshots.Result deleteFiles(Iterator files) {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index b2421abfb92b..7004c6f8e079 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -37,6 +37,7 @@
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -1250,4 +1252,104 @@ public void testExpireAfterExecute() {
List untypedExpiredFiles = action.expire().collectAsList();
Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
}
+
+ @Test
+ public void testExpireFileDeletionMostExpired() {
+ textExpireAllCheckFilesDeleted(5, 2);
+ }
+
+ @Test
+ public void testExpireFileDeletionMostRetained() {
+ textExpireAllCheckFilesDeleted(2, 5);
+ }
+
+ public void textExpireAllCheckFilesDeleted(int dataFilesExpired, int dataFilesRetained) {
+ // Add data files to be expired
+ Set dataFiles = Sets.newHashSet();
+ for (int i = 0; i < dataFilesExpired; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-expired-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ dataFiles.add(df.path().toString());
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ // Delete them all, these will be deleted on expire snapshot
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+ // Clears "DELETED" manifests
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+ Set manifestsBefore = TestHelpers.reachableManifestPaths(table);
+
+ // Add data files to be retained, which are not deleted.
+ for (int i = 0; i < dataFilesRetained; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-retained-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ long end = rightAfterSnapshot();
+
+ Set expectedDeletes = Sets.newHashSet();
+ expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
+ // all snapshot manifest lists except current will be deleted
+ expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
+ expectedDeletes.addAll(
+ manifestsBefore); // new manifests are reachable from current snapshot and not deleted
+ expectedDeletes.addAll(
+ dataFiles); // new data files are reachable from current snapshot and not deleted
+
+ Set deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(end)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ Assert.assertEquals(
+ "All reachable files before expiration should be deleted", expectedDeletes, deletedFiles);
+ }
+
+ @Test
+ public void testExpireSomeCheckFilesDeleted() {
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newAppend().appendFile(FILE_C).commit();
+
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+ waitUntilAfter(after);
+
+ table.newAppend().appendFile(FILE_D).commit();
+
+ table.newDelete().deleteFile(FILE_B).commit();
+
+ Set deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ // C, D should be retained (live)
+ // B should be retained (previous snapshot points to it)
+ // A should be deleted
+ Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
+ }
}
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 69b14eead4d5..e5ad0ca2139a 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -37,6 +37,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
@@ -808,4 +810,11 @@ public static Set deleteFiles(Table table) {
return deleteFiles;
}
+
+ public static Set reachableManifestPaths(Table table) {
+ return StreamSupport.stream(table.snapshots().spliterator(), false)
+ .flatMap(s -> s.allManifests(table.io()).stream())
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ }
}