Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
* <p>This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
private static final int REF_SNAPSHOT_ID = 18;
public static final Types.NestedField REF_SNAPSHOT_ID =
Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get());

private static final Schema MANIFEST_FILE_SCHEMA =
new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
Expand All @@ -74,8 +76,7 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())))),
Types.NestedField.required(
REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get()));
REF_SNAPSHOT_ID);

AllManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_manifests");
Expand Down Expand Up @@ -424,7 +425,7 @@ private <T> Boolean compareSnapshotRef(
}

private <T> boolean isSnapshotRef(BoundReference<T> ref) {
return ref.fieldId() == REF_SNAPSHOT_ID;
return ref.fieldId() == REF_SNAPSHOT_ID.fieldId();
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,10 +101,25 @@ private static TableMetadata findFirstExistentPreviousMetadata(
* Returns locations of manifest lists in a table.
*
* @param table table for which manifestList needs to be fetched
* @return the location of manifest Lists
* @return the location of manifest lists
*/
public static List<String> manifestListLocations(Table table) {
return manifestListLocations(table, null);
}

/**
* Returns locations of manifest lists in a table.
*
* @param table table for which manifestList needs to be fetched
* @param snapshotIds ids of snapshots for which manifest lists will be returned
* @return the location of manifest lists
*/
public static List<String> manifestListLocations(Table table, Set<Long> snapshotIds) {
Iterable<Snapshot> snapshots = table.snapshots();
if (snapshotIds != null) {
snapshots = Iterables.filter(snapshots, s -> snapshotIds.contains(s.snapshotId()));
}

List<String> manifestListLocations = Lists.newArrayList();
for (Snapshot snapshot : snapshots) {
String manifestListLocation = snapshot.manifestListLocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -137,14 +140,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}

// builds a DF of delete and data file path and type by reading all manifests
protected Dataset<FileInfo> contentFileDS(Table table) {
return contentFileDS(table, null);
}

protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();

Dataset<ManifestFileBean> allManifests =
loadMetadataTable(table, ALL_MANIFESTS)
Dataset<ManifestFileBean> manifestBeanDS =
manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
Expand All @@ -155,17 +161,35 @@ protected Dataset<FileInfo> contentFileDS(Table table) {
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);

return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}

protected Dataset<FileInfo> manifestDS(Table table) {
return loadMetadataTable(table, ALL_MANIFESTS)
return manifestDS(table, null);
}

protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}

private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
if (snapshotIds != null) {
Column filterCond = col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
return manifestDF.filter(filterCond);
} else {
return manifestDF;
}
}

protected Dataset<FileInfo> manifestListDS(Table table) {
List<String> manifestLists = ReachableFileUtil.manifestListLocations(table);
return manifestListDS(table, null);
}

protected Dataset<FileInfo> manifestListDS(Table table, Set<Long> snapshotIds) {
List<String> manifestLists = ReachableFileUtil.manifestListLocations(table, snapshotIds);
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -165,7 +167,7 @@ public Dataset<Row> expire() {
public Dataset<FileInfo> expireFiles() {
if (expiredFileDS == null) {
// fetch metadata before expiration
Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
TableMetadata originalMetadata = ops.current();

// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots();
Expand All @@ -184,11 +186,16 @@ public Dataset<FileInfo> expireFiles() {

expireSnapshots.cleanExpiredFiles(false).commit();

// fetch metadata after expiration
Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);

// fetch files referenced by expired snapshots
Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds);

// determine expired files
this.expiredFileDS = originalFileDS.except(validFileDS);
this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
}

return expiredFileDS;
Expand Down Expand Up @@ -236,11 +243,25 @@ private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}

private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
private Dataset<FileInfo> fileDS(TableMetadata metadata) {
return fileDS(metadata, null);
}

private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long> snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
return contentFileDS(staticTable)
.union(manifestDS(staticTable))
.union(manifestListDS(staticTable));
return contentFileDS(staticTable, snapshotIds)
.union(manifestDS(staticTable, snapshotIds))
.union(manifestListDS(staticTable, snapshotIds));
}

private Set<Long> findExpiredSnapshotIds(
TableMetadata originalMetadata, TableMetadata updatedMetadata) {
Set<Long> retainedSnapshots =
updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
return originalMetadata.snapshots().stream()
.map(Snapshot::snapshotId)
.filter(id -> !retainedSnapshots.contains(id))
.collect(Collectors.toSet());
}

private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
Expand All @@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -1250,4 +1252,104 @@ public void testExpireAfterExecute() {
List<Row> untypedExpiredFiles = action.expire().collectAsList();
Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
}

@Test
public void testExpireFileDeletionMostExpired() {
testExpireFilesAreDeleted(5, 2);
}

@Test
public void testExpireFileDeletionMostRetained() {
testExpireFilesAreDeleted(2, 5);
}

public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is okay, but I think that it doesn't really test cases where the retained and deleted files are mixed together. That deserves a direct test, where the deleteCandidateFileDS actually contains files in the validFileDS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, added test below

// Add data files to be expired
Set<String> dataFiles = Sets.newHashSet();
for (int i = 0; i < dataFilesExpired; i++) {
DataFile df =
DataFiles.builder(SPEC)
.withPath(String.format("/path/to/data-expired-%d.parquet", i))
.withFileSizeInBytes(10)
.withPartitionPath("c1=1")
.withRecordCount(1)
.build();
dataFiles.add(df.path().toString());
table.newFastAppend().appendFile(df).commit();
}

// Delete them all, these will be deleted on expire snapshot
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
// Clears "DELETED" manifests
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();

Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(table);

// Add data files to be retained, which are not deleted.
for (int i = 0; i < dataFilesRetained; i++) {
DataFile df =
DataFiles.builder(SPEC)
.withPath(String.format("/path/to/data-retained-%d.parquet", i))
.withFileSizeInBytes(10)
.withPartitionPath("c1=1")
.withRecordCount(1)
.build();
table.newFastAppend().appendFile(df).commit();
}

long end = rightAfterSnapshot();

Set<String> expectedDeletes = Sets.newHashSet();
expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
// all snapshot manifest lists except current will be deleted
expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
expectedDeletes.addAll(
manifestsBefore); // new manifests are reachable from current snapshot and not deleted
expectedDeletes.addAll(
dataFiles); // new data files are reachable from current snapshot and not deleted

Set<String> deletedFiles = Sets.newHashSet();
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(end)
.deleteWith(deletedFiles::add)
.execute();

Assert.assertEquals(
"All reachable files before expiration should be deleted", expectedDeletes, deletedFiles);
}

@Test
public void testExpireSomeCheckFilesDeleted() {

table.newAppend().appendFile(FILE_A).commit();

table.newAppend().appendFile(FILE_B).commit();

table.newAppend().appendFile(FILE_C).commit();

table.newDelete().deleteFile(FILE_A).commit();

long after = rightAfterSnapshot();
waitUntilAfter(after);

table.newAppend().appendFile(FILE_D).commit();

table.newDelete().deleteFile(FILE_B).commit();

Set<String> deletedFiles = Sets.newHashSet();
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(after)
.deleteWith(deletedFiles::add)
.execute();

// C, D should be retained (live)
// B should be retained (previous snapshot points to it)
// A should be deleted
Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
Expand Down Expand Up @@ -808,4 +810,11 @@ public static Set<DeleteFile> deleteFiles(Table table) {

return deleteFiles;
}

public static Set<String> reachableManifestPaths(Table table) {
return StreamSupport.stream(table.snapshots().spliterator(), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually a method in ManifestFiles that more efficiently returns just the paths if you want to use it. @amogh-jahagirdar uses it in the non-Spark reachable file implementation: #5669 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I miss something, but looks like that is for data files of a manifest, while this is for listing manifests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's for listing live data file paths for a given manifest.

.flatMap(s -> s.allManifests(table.io()).stream())
.map(ManifestFile::path)
.collect(Collectors.toSet());
}
}
Loading