diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index 3d0b6a6a8c72..a03853bd3cf3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -631,6 +631,22 @@ private static Dataset loadMetadataTable(SparkSession spark, org.apache.ice return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty())); } + /** + * Returns a metadata table as a Dataset based on the given Iceberg table. + * + * @param spark SparkSession where the Dataset will be created + * @param table an Iceberg table + * @param type the type of metadata table + * @param snapshotId snapshot id + * @return a Dataset that will read the metadata table + */ + private static Dataset loadMetadataTable(SparkSession spark, org.apache.iceberg.Table table, + MetadataTableType type, Long snapshotId) { + Table metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), snapshotId, + false); + return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty())); + } + /** * Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog}, * the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one. diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index e6c90cbccfd3..181b59d4ef86 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -605,15 +605,31 @@ private static void deleteManifests(FileIO io, List manifests) { .orNoop() .build(); + private static final DynMethods.UnboundMethod LOAD_METADATA_TABLE_ASOF = DynMethods.builder("loadMetadataTable") + .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, Table.class, MetadataTableType.class, Long.class) + .orNoop() + .build(); + public static Dataset loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) { Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use"); return LOAD_METADATA_TABLE.asStatic().invoke(spark, table, type); } + public static Dataset loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type, Long snapshotId) { + Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use"); + return LOAD_METADATA_TABLE_ASOF.asStatic().invoke(spark, table, type, snapshotId); + } + public static Dataset loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) { + return loadMetadataTable(spark, table, type, null); + } + + public static Dataset loadMetadataTable(SparkSession spark, Table table, MetadataTableType type, Long snapshot) { if (spark.version().startsWith("3")) { // construct the metadata table instance directly - Dataset catalogMetadataTable = loadCatalogMetadataTable(spark, table, type); + Dataset catalogMetadataTable = snapshot == null ? + loadCatalogMetadataTable(spark, table, type) : + loadCatalogMetadataTable(spark, table, type, snapshot); if (catalogMetadataTable != null) { return catalogMetadataTable; } @@ -623,6 +639,11 @@ public static Dataset loadMetadataTable(SparkSession spark, Table table, Me String tableLocation = table.location(); DataFrameReader dataFrameReader = spark.read().format("iceberg"); + + if (snapshot != null) { + dataFrameReader.option("snapshot-id", snapshot); + } + if (tableName.contains("/")) { // Hadoop Table or Metadata location passed, load without a catalog return dataFrameReader.load(tableName + "#" + type); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java index 310081d18edb..d56711e7d7c3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -25,7 +25,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -40,9 +42,11 @@ import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,8 +150,8 @@ public BaseExpireSnapshotsSparkAction deleteWith(Consumer newDeleteFunc) */ public Dataset expire() { if (expiredFiles == null) { - // fetch metadata before expiration - Dataset originalFiles = buildValidFileDF(ops.current()); + // Save old metadata + TableMetadata originalTable = ops.current(); // perform expiration org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false); @@ -165,11 +169,20 @@ public Dataset expire() { expireSnapshots.commit(); - // fetch metadata after expiration - Dataset validFiles = buildValidFileDF(ops.refresh()); + TableMetadata updatedTable = ops.refresh(); + Set updatedSnapshotIds = updatedTable.snapshots().stream() + .map(Snapshot::snapshotId) + .collect(Collectors.toSet()); + Set removedSnapshotIds = originalTable.snapshots().stream() + .map(Snapshot::snapshotId) + .filter(originalId -> !updatedSnapshotIds.contains(originalId)) + .collect(Collectors.toSet()); + + Dataset validFiles = buildValidFileDF(updatedTable, null); + Dataset expiredSnapshotFiles = buildValidFileDF(originalTable, removedSnapshotIds); // determine expired files - this.expiredFiles = originalFiles.except(validFiles); + this.expiredFiles = expiredSnapshotFiles.except(validFiles); } return expiredFiles; @@ -213,11 +226,17 @@ private ExpireSnapshots.Result doExecute() { } } - private Dataset buildValidFileDF(TableMetadata metadata) { - Table staticTable = newStaticTable(metadata, table.io()); - return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE) - .union(withFileType(buildManifestFileDF(staticTable), MANIFEST)) - .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST)); + private Dataset appendTypeString(Dataset ds, String type) { + return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); + } + + private Dataset buildValidFileDF(TableMetadata metadata, Set snapshots) { + Table staticTable = newStaticTable(metadata, this.table.io()); + Dataset files = (snapshots == null) ? buildValidContentFileDF(table) : + buildFilteredContentFileDf(table, snapshots); + return appendTypeString(files, CONTENT_FILE) + .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST)) + .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)); } /** diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index f8c5e454b0ca..929c34ed29fc 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.iceberg.BaseTable; @@ -34,6 +35,7 @@ import org.apache.iceberg.actions.Action; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; @@ -50,6 +52,7 @@ import org.apache.spark.sql.SparkSession; import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; +import static org.apache.iceberg.MetadataTableType.MANIFESTS; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.lit; @@ -122,18 +125,32 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { return new BaseTable(ops, metadataFileLocation); } + protected Dataset buildFilteredContentFileDf(Table table, Set snapshots) { + Preconditions.checkArgument(snapshots.size() > 0, "Cannot filter by empty snapshot groups"); + JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); + Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); + + Dataset result = snapshots.stream() + .map(s -> asManifestFileDs(loadMetadataTable(table, MANIFESTS, s))) + .reduce(Dataset::union).get(); + return result.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH); + } + // builds a DF of delete and data file locations by reading all manifests protected Dataset buildValidContentFileDF(Table table) { JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); - Dataset allManifests = loadMetadataTable(table, ALL_MANIFESTS) + Dataset allManifests = asManifestFileDs(loadMetadataTable(table, ALL_MANIFESTS)); + return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH); + } + + private Dataset asManifestFileDs(Dataset dataset) { + return dataset .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId") .dropDuplicates("path") .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks .as(Encoders.bean(ManifestFileBean.class)); - - return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH); } protected Dataset buildManifestFileDF(Table table) { @@ -172,6 +189,10 @@ protected Dataset withFileType(Dataset ds, String type) { return ds.withColumn(FILE_TYPE, lit(type)); } + protected Dataset loadMetadataTable(Table table, MetadataTableType type, long snapshot) { + return SparkTableUtil.loadMetadataTable(spark, table, type, snapshot); + } + protected Dataset loadMetadataTable(Table table, MetadataTableType type) { return SparkTableUtil.loadMetadataTable(spark, table, type); }