diff --git a/docs/spark/spark-procedures.md b/docs/spark/spark-procedures.md index e687338147ab..a9815bb4e245 100644 --- a/docs/spark/spark-procedures.md +++ b/docs/spark/spark-procedures.md @@ -202,6 +202,7 @@ the `expire_snapshots` procedure will never remove files which are still require | `retain_last` | | int | Number of ancestor snapshots to preserve regardless of `older_than` (defaults to 1) | | `max_concurrent_deletes` | | int | Size of the thread pool used for delete file actions (by default, no thread pool is used) | | `stream_results` | | boolean | When true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set to `true` to prevent Spark driver OOM from large file size | +| `use_caching` | ️ | boolean | Use Spark caching during operation (defaults to true) | If `older_than` and `retain_last` are omitted, the table's [expiration properties](./configuration/#table-behavior-properties) will be used. @@ -234,6 +235,7 @@ Used to remove files which are not referenced in any metadata files of an Iceber | `location` | | string | Directory to look for files in (defaults to the table's location) | | `dry_run` | | boolean | When true, don't actually remove files (defaults to false) | | `max_concurrent_deletes` | | int | Size of the thread pool used for delete file actions (by default, no thread pool is used) | +| `use_caching` | ️ | boolean | Use Spark caching during operation (defaults to true) | #### Output diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java index dc58a05d4d0d..e601431ebea9 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java @@ -68,6 +68,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; @@ -119,6 +120,9 @@ public void accept(String file) { private Consumer deleteFunc = defaultDelete; private ExecutorService deleteExecutorService = null; + private static final String USE_CACHING = "use-caching"; + private static final boolean USE_CACHING_DEFAULT = true; + public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) { super(spark); @@ -208,8 +212,13 @@ private String jobDesc() { } private DeleteOrphanFiles.Result doExecute() { - Dataset validContentFileDF = buildValidContentFileDF(table); - Dataset validMetadataFileDF = buildValidMetadataFileDF(table); + Dataset allManifests = loadMetadataTable(table, ALL_MANIFESTS); + boolean useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT); + if (useCaching) { + allManifests.persist(); + } + Dataset validContentFileDF = buildValidContentFileDF(table, allManifests); + Dataset validMetadataFileDF = buildValidMetadataFileDF(table, allManifests); Dataset validFileDF = validContentFileDF.union(validMetadataFileDF); Dataset actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList(); @@ -221,7 +230,9 @@ private DeleteOrphanFiles.Result doExecute() { List orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti") .as(Encoders.STRING()) .collectAsList(); - + if (useCaching) { + allManifests.unpersist(); + } Tasks.foreach(orphanFiles) .noRetry() .executeWith(deleteExecutorService) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java index fc2a5fa5cf00..2bab57b3d7a2 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; @@ -69,6 +70,11 @@ public void accept(String file) { private Consumer deleteFunc = defaultDelete; private ExecutorService deleteExecutorService = null; private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf()); + private Dataset allManifests = null; + private boolean useCaching = true; + + private static final String USE_CACHING = "use-caching"; + private static final boolean USE_CACHING_DEFAULT = true; public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) { super(spark); @@ -116,17 +122,27 @@ private Result doExecute() { Dataset reachableFileDF = buildReachableFileDF(metadata).distinct(); boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); + Result result; if (streamResults) { - return deleteFiles(reachableFileDF.toLocalIterator()); + result = deleteFiles(reachableFileDF.toLocalIterator()); } else { - return deleteFiles(reachableFileDF.collectAsList().iterator()); + result = deleteFiles(reachableFileDF.collectAsList().iterator()); + } + if (useCaching) { + allManifests.unpersist(); } + return result; } private Dataset buildReachableFileDF(TableMetadata metadata) { Table staticTable = newStaticTable(metadata, io); - return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE) - .union(withFileType(buildManifestFileDF(staticTable), MANIFEST)) + allManifests = loadMetadataTable(staticTable, ALL_MANIFESTS); + useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT); + if (useCaching) { + allManifests.persist(); + } + return withFileType(buildValidContentFileDF(staticTable, allManifests), CONTENT_FILE) + .union(withFileType(buildManifestFileDF(allManifests), MANIFEST)) .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST)) .union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS)); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java index 310081d18edb..e7bd772656cd 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -27,7 +27,6 @@ import java.util.function.Consumer; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult; import org.apache.iceberg.actions.ExpireSnapshots; @@ -46,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; @@ -87,6 +87,12 @@ public void accept(String file) { private Consumer deleteFunc = defaultDelete; private ExecutorService deleteExecutorService = null; private Dataset expiredFiles = null; + private Dataset allManifestsBefore = null; + private Dataset allManifestsAfter = null; + private boolean useCaching = true; + + private static final String USE_CACHING = "use-caching"; + private static final boolean USE_CACHING_DEFAULT = true; public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) { super(spark); @@ -147,7 +153,13 @@ public BaseExpireSnapshotsSparkAction deleteWith(Consumer newDeleteFunc) public Dataset expire() { if (expiredFiles == null) { // fetch metadata before expiration - Dataset originalFiles = buildValidFileDF(ops.current()); + Table staticTableBefore = newStaticTable(ops.current(), table.io()); + allManifestsBefore = loadMetadataTable(staticTableBefore, ALL_MANIFESTS); + useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT); + if (useCaching) { + allManifestsBefore.persist(); + } + Dataset originalFiles = buildValidFileDF(staticTableBefore, allManifestsBefore); // perform expiration org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false); @@ -166,7 +178,12 @@ public Dataset expire() { expireSnapshots.commit(); // fetch metadata after expiration - Dataset validFiles = buildValidFileDF(ops.refresh()); + Table staticTableAfter = newStaticTable(ops.refresh(), table.io()); + allManifestsAfter = loadMetadataTable(staticTableAfter, ALL_MANIFESTS); + if (useCaching) { + allManifestsAfter.persist(); + } + Dataset validFiles = buildValidFileDF(staticTableAfter, allManifestsAfter); // determine expired files this.expiredFiles = originalFiles.except(validFiles); @@ -206,17 +223,22 @@ private String jobDesc() { private ExpireSnapshots.Result doExecute() { boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); + BaseExpireSnapshotsActionResult result; if (streamResults) { - return deleteFiles(expire().toLocalIterator()); + result = deleteFiles(expire().toLocalIterator()); } else { - return deleteFiles(expire().collectAsList().iterator()); + result = deleteFiles(expire().collectAsList().iterator()); + } + if (useCaching) { + allManifestsBefore.unpersist(); + allManifestsAfter.unpersist(); } + return result; } - private Dataset buildValidFileDF(TableMetadata metadata) { - Table staticTable = newStaticTable(metadata, table.io()); - return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE) - .union(withFileType(buildManifestFileDF(staticTable), MANIFEST)) + private Dataset buildValidFileDF(Table staticTable, Dataset allManifests) { + return withFileType(buildValidContentFileDF(staticTable, allManifests), CONTENT_FILE) + .union(withFileType(buildManifestFileDF(allManifests), MANIFEST)) .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST)); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index f8c5e454b0ca..d44439e8fc84 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -49,7 +49,6 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.lit; @@ -123,11 +122,11 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { } // builds a DF of delete and data file locations by reading all manifests - protected Dataset buildValidContentFileDF(Table table) { + protected Dataset buildValidContentFileDF(Table table, Dataset allManifestsDf) { JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); - Dataset allManifests = loadMetadataTable(table, ALL_MANIFESTS) + Dataset allManifests = allManifestsDf .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId") .dropDuplicates("path") .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks @@ -136,8 +135,8 @@ protected Dataset buildValidContentFileDF(Table table) { return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH); } - protected Dataset buildManifestFileDF(Table table) { - return loadMetadataTable(table, ALL_MANIFESTS).select(col("path").as(FILE_PATH)); + protected Dataset buildManifestFileDF(Dataset allManifestsDf) { + return allManifestsDf.select(col("path").as(FILE_PATH)); } protected Dataset buildManifestListDF(Table table) { @@ -160,8 +159,8 @@ private Dataset buildOtherMetadataFileDF(Table table, boolean includePrevio return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF(FILE_PATH); } - protected Dataset buildValidMetadataFileDF(Table table) { - Dataset manifestDF = buildManifestFileDF(table); + protected Dataset buildValidMetadataFileDF(Table table, Dataset allManifests) { + Dataset manifestDF = buildManifestFileDF(allManifests); Dataset manifestListDF = buildManifestListDF(table); Dataset otherMetadataFileDF = buildOtherMetadataFileDF(table); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index 8cf4d275cd9e..1a4a77fb3352 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -47,7 +47,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { ProcedureParameter.optional("older_than", DataTypes.TimestampType), ProcedureParameter.optional("retain_last", DataTypes.IntegerType), ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType), - ProcedureParameter.optional("stream_results", DataTypes.BooleanType) + ProcedureParameter.optional("stream_results", DataTypes.BooleanType), + ProcedureParameter.optional("use_caching", DataTypes.BooleanType) }; private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ @@ -86,6 +87,7 @@ public InternalRow[] call(InternalRow args) { Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2); Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3); Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4); + Boolean useCaching = args.isNullAt(5) ? null : args.getBoolean(5); Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0, "max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes); @@ -101,7 +103,7 @@ public InternalRow[] call(InternalRow args) { action.retainLast(retainLastNum); } - if (maxConcurrentDeletes != null && maxConcurrentDeletes > 0) { + if (maxConcurrentDeletes != null) { action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots")); } @@ -109,6 +111,10 @@ public InternalRow[] call(InternalRow args) { action.option(BaseExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult)); } + if (useCaching != null) { + action.option("use_caching", useCaching.toString()); + } + ExpireSnapshots.Result result = action.execute(); return toOutputRows(result); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 23d64719081c..53ba73073532 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -51,7 +51,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { ProcedureParameter.optional("location", DataTypes.StringType), ProcedureParameter.optional("dry_run", DataTypes.BooleanType), ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType), - ProcedureParameter.optional("file_list_view", DataTypes.StringType) + ProcedureParameter.optional("file_list_view", DataTypes.StringType), + ProcedureParameter.optional("use_caching", DataTypes.BooleanType) }; private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ @@ -90,6 +91,7 @@ public InternalRow[] call(InternalRow args) { boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); String fileListView = args.isNullAt(5) ? null : args.getString(5); + Boolean useCaching = args.isNullAt(6) ? null : args.getBoolean(6); Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0, "max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes); @@ -113,7 +115,7 @@ public InternalRow[] call(InternalRow args) { action.deleteWith(file -> { }); } - if (maxConcurrentDeletes != null && maxConcurrentDeletes > 0) { + if (maxConcurrentDeletes != null) { action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); } @@ -121,6 +123,10 @@ public InternalRow[] call(InternalRow args) { ((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView)); } + if (useCaching != null) { + action.option("use_caching", useCaching.toString()); + } + DeleteOrphanFiles.Result result = action.execute(); return toOutputRows(result);