Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand All @@ -36,7 +38,10 @@ abstract class BaseAction<R> implements Action<R> {
protected abstract Table table();

protected String metadataTableName(MetadataTableType type) {
String tableName = table().toString();
return metadataTableName(table().toString(), type);
}

protected String metadataTableName(String tableName, MetadataTableType type) {
if (tableName.contains("/")) {
return tableName + "#" + type;
} else if (tableName.startsWith("hadoop.")) {
Expand All @@ -56,9 +61,9 @@ protected String metadataTableName(MetadataTableType type) {
* @param table the table
* @return the paths of the Manifest Lists
*/
protected List<String> getManifestListPaths(Table table) {
private List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
List<String> manifestLists = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
for (Snapshot snapshot : snapshots) {
String manifestListLocation = snapshot.manifestListLocation();
if (manifestListLocation != null) {
manifestLists.add(manifestListLocation);
Expand All @@ -73,7 +78,7 @@ protected List<String> getManifestListPaths(Table table) {
* @param ops TableOperations for the table we will be getting paths from
* @return a list of paths to metadata files
*/
protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
private List<String> getOtherMetadataFilePaths(TableOperations ops) {
List<String> otherMetadataFiles = Lists.newArrayList();
otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));

Expand All @@ -86,27 +91,36 @@ protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
}

protected Dataset<Row> buildValidDataFileDF(SparkSession spark) {
String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES);
return buildValidDataFileDF(spark, table().toString());
}

protected Dataset<Row> buildValidDataFileDF(SparkSession spark, String tableName) {
String allDataFilesMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_DATA_FILES);
return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
}

protected Dataset<Row> buildManifestFileDF(SparkSession spark) {
String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS);
protected Dataset<Row> buildManifestFileDF(SparkSession spark, String tableName) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add these 2 arg versions so that we can specify metadata Json files directly, the single arg versions just use the current table state as before.

String allManifestsMetadataTable = metadataTableName(tableName, MetadataTableType.ALL_MANIFESTS);
return spark.read().format("iceberg").load(allManifestsMetadataTable).selectExpr("path as file_path");
}

protected Dataset<Row> buildManifestListDF(SparkSession spark, Table table) {
List<String> manifestLists = getManifestListPaths(table);
List<String> manifestLists = getManifestListPaths(table.snapshots());
return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildManifestListDF(SparkSession spark, String metadataFileLocation) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot pass a pure table name here since we aren't looking up the table using Spark, this path is for metadataFileLocation based tables only.

StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, table().io());
return buildManifestListDF(spark, new BaseTable(ops, table().toString()));
}

protected Dataset<Row> buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) {
List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops);
return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) {
Dataset<Row> manifestDF = buildManifestFileDF(spark);
Dataset<Row> manifestDF = buildManifestFileDF(spark, table.toString());
Dataset<Row> manifestListDF = buildManifestListDF(spark, table);
Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(spark, ops);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -147,49 +148,40 @@ public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {

@Override
public ExpireSnapshotsActionResult execute() {
Dataset<Row> originalFiles = null;
try {
// Metadata before Expiration
originalFiles = buildValidFileDF().persist();
// Action to trigger persist
originalFiles.count();

// Perform Expiration
ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false);
for (final Long id : expireSnapshotIdValues) {
expireSnaps = expireSnaps.expireSnapshotId(id);
}

if (expireOlderThanValue != null) {
expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue);
}

if (retainLastValue != null) {
expireSnaps = expireSnaps.retainLast(retainLastValue);
}

expireSnaps.commit();

// Metadata after Expiration
Dataset<Row> validFiles = buildValidFileDF();
Dataset<Row> filesToDelete = originalFiles.except(validFiles);

return deleteFiles(filesToDelete.toLocalIterator());
} finally {
if (originalFiles != null) {
originalFiles.unpersist();
}
// Metadata before Expiration
Dataset<Row> originalFiles = buildValidFileDF(ops.current());

// Perform Expiration
ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false);
for (final Long id : expireSnapshotIdValues) {
expireSnaps = expireSnaps.expireSnapshotId(id);
}

if (expireOlderThanValue != null) {
expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue);
}

if (retainLastValue != null) {
expireSnaps = expireSnaps.retainLast(retainLastValue);
}

expireSnaps.commit();

// Metadata after Expiration
Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
Dataset<Row> filesToDelete = originalFiles.except(validFiles);

return deleteFiles(filesToDelete.toLocalIterator());
}

private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
return ds.select(new Column("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF() {
return appendTypeString(buildValidDataFileDF(spark), DATA_FILE)
.union(appendTypeString(buildManifestFileDF(spark), MANIFEST))
.union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST));
private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
return appendTypeString(buildValidDataFileDF(spark, metadata.metadataFileLocation()), DATA_FILE)
.union(appendTypeString(buildManifestFileDF(spark, metadata.metadataFileLocation()), MANIFEST))
.union(appendTypeString(buildManifestListDF(spark, metadata.metadataFileLocation()), MANIFEST_LIST));
}

/**
Expand Down