Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,22 @@ private static Dataset<Row> loadMetadataTable(SparkSession spark, org.apache.ice
return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty()));
}

/**
* Returns a metadata table as a Dataset based on the given Iceberg table.
*
* @param spark SparkSession where the Dataset will be created
* @param table an Iceberg table
* @param type the type of metadata table
* @param snapshotId snapshot id
* @return a Dataset that will read the metadata table
*/
private static Dataset<Row> loadMetadataTable(SparkSession spark, org.apache.iceberg.Table table,
MetadataTableType type, Long snapshotId) {
Table metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), snapshotId,
false);
return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty()));
}

/**
* Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog},
* the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,31 @@ private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
.orNoop()
.build();

private static final DynMethods.UnboundMethod LOAD_METADATA_TABLE_ASOF = DynMethods.builder("loadMetadataTable")
.hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, Table.class, MetadataTableType.class, Long.class)
.orNoop()
.build();

public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use");
return LOAD_METADATA_TABLE.asStatic().invoke(spark, table, type);
}

public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type, Long snapshotId) {
Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use");
return LOAD_METADATA_TABLE_ASOF.asStatic().invoke(spark, table, type, snapshotId);
}

public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
return loadMetadataTable(spark, table, type, null);
}

public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type, Long snapshot) {
if (spark.version().startsWith("3")) {
// construct the metadata table instance directly
Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(spark, table, type);
Dataset<Row> catalogMetadataTable = snapshot == null ?
loadCatalogMetadataTable(spark, table, type) :
loadCatalogMetadataTable(spark, table, type, snapshot);
if (catalogMetadataTable != null) {
return catalogMetadataTable;
}
Expand All @@ -623,6 +639,11 @@ public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, Me
String tableLocation = table.location();

DataFrameReader dataFrameReader = spark.read().format("iceberg");

if (snapshot != null) {
dataFrameReader.option("snapshot-id", snapshot);
}

if (tableName.contains("/")) {
// Hadoop Table or Metadata location passed, load without a catalog
return dataFrameReader.load(tableName + "#" + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand All @@ -40,9 +42,11 @@
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,8 +150,8 @@ public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc)
*/
public Dataset<Row> expire() {
if (expiredFiles == null) {
// fetch metadata before expiration
Dataset<Row> originalFiles = buildValidFileDF(ops.current());
// Save old metadata
TableMetadata originalTable = ops.current();

// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false);
Expand All @@ -165,11 +169,20 @@ public Dataset<Row> expire() {

expireSnapshots.commit();

// fetch metadata after expiration
Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
TableMetadata updatedTable = ops.refresh();
Set<Long> updatedSnapshotIds = updatedTable.snapshots().stream()
.map(Snapshot::snapshotId)
.collect(Collectors.toSet());
Set<Long> removedSnapshotIds = originalTable.snapshots().stream()
.map(Snapshot::snapshotId)
.filter(originalId -> !updatedSnapshotIds.contains(originalId))
.collect(Collectors.toSet());

Dataset<Row> validFiles = buildValidFileDF(updatedTable, null);
Dataset<Row> expiredSnapshotFiles = buildValidFileDF(originalTable, removedSnapshotIds);

// determine expired files
this.expiredFiles = originalFiles.except(validFiles);
this.expiredFiles = expiredSnapshotFiles.except(validFiles);
}

return expiredFiles;
Expand Down Expand Up @@ -213,11 +226,17 @@ private ExpireSnapshots.Result doExecute() {
}
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, table.io());
return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
return ds.select(new Column("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata, Set<Long> snapshots) {
Table staticTable = newStaticTable(metadata, this.table.io());
Dataset<Row> files = (snapshots == null) ? buildValidContentFileDF(table) :
buildFilteredContentFileDf(table, snapshots);
return appendTypeString(files, CONTENT_FILE)
.union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
Expand All @@ -34,6 +35,7 @@
import org.apache.iceberg.actions.Action;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
Expand All @@ -50,6 +52,7 @@
import org.apache.spark.sql.SparkSession;

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.iceberg.MetadataTableType.MANIFESTS;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;

Expand Down Expand Up @@ -122,18 +125,32 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}

protected Dataset<Row> buildFilteredContentFileDf(Table table, Set<Long> snapshots) {
Preconditions.checkArgument(snapshots.size() > 0, "Cannot filter by empty snapshot groups");
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));

Dataset<ManifestFileBean> result = snapshots.stream()
.map(s -> asManifestFileDs(loadMetadataTable(table, MANIFESTS, s)))
.reduce(Dataset::union).get();
return result.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH);
}

// builds a DF of delete and data file locations by reading all manifests
protected Dataset<Row> buildValidContentFileDF(Table table) {
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));

Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
Dataset<ManifestFileBean> allManifests = asManifestFileDs(loadMetadataTable(table, ALL_MANIFESTS));
return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH);
}

private Dataset<ManifestFileBean> asManifestFileDs(Dataset<Row> dataset) {
return dataset
.selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
.as(Encoders.bean(ManifestFileBean.class));

return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH);
}

protected Dataset<Row> buildManifestFileDF(Table table) {
Expand Down Expand Up @@ -172,6 +189,10 @@ protected Dataset<Row> withFileType(Dataset<Row> ds, String type) {
return ds.withColumn(FILE_TYPE, lit(type));
}

protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type, long snapshot) {
return SparkTableUtil.loadMetadataTable(spark, table, type, snapshot);
}

protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
return SparkTableUtil.loadMetadataTable(spark, table, type);
}
Expand Down