Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/spark/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ the `expire_snapshots` procedure will never remove files which are still require
| `retain_last` | | int | Number of ancestor snapshots to preserve regardless of `older_than` (defaults to 1) |
| `max_concurrent_deletes` | | int | Size of the thread pool used for delete file actions (by default, no thread pool is used) |
| `stream_results` | | boolean | When true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set to `true` to prevent Spark driver OOM from large file size |
| `use_caching` | ️ | boolean | Use Spark caching during operation (defaults to true) |

If `older_than` and `retain_last` are omitted, the table's [expiration properties](./configuration/#table-behavior-properties) will be used.

Expand Down Expand Up @@ -234,6 +235,7 @@ Used to remove files which are not referenced in any metadata files of an Iceber
| `location` | | string | Directory to look for files in (defaults to the table's location) |
| `dry_run` | | boolean | When true, don't actually remove files (defaults to false) |
| `max_concurrent_deletes` | | int | Size of the thread pool used for delete file actions (by default, no thread pool is used) |
| `use_caching` | ️ | boolean | Use Spark caching during operation (defaults to true) |

#### Output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;

Expand Down Expand Up @@ -119,6 +120,9 @@ public void accept(String file) {
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;

private static final String USE_CACHING = "use-caching";
private static final boolean USE_CACHING_DEFAULT = true;

public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);

Expand Down Expand Up @@ -208,8 +212,13 @@ private String jobDesc() {
}

private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS);
boolean useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
allManifests.persist();
}
Dataset<Row> validContentFileDF = buildValidContentFileDF(table, allManifests);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table, allManifests);
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();

Expand All @@ -221,7 +230,9 @@ private DeleteOrphanFiles.Result doExecute() {
List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
.as(Encoders.STRING())
.collectAsList();

if (useCaching) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be in a finally block, otherwise if we have an error in the collect as list there is a possibility that we do not uncache. For services running these actions that would be a problem.

allManifests.unpersist();
}
Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;

Expand Down Expand Up @@ -69,6 +70,11 @@ public void accept(String file) {
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
private Dataset<Row> allManifests = null;
private boolean useCaching = true;

private static final String USE_CACHING = "use-caching";
private static final boolean USE_CACHING_DEFAULT = true;

public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
super(spark);
Expand Down Expand Up @@ -116,17 +122,27 @@ private Result doExecute() {
Dataset<Row> reachableFileDF = buildReachableFileDF(metadata).distinct();

boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
Result result;
if (streamResults) {
return deleteFiles(reachableFileDF.toLocalIterator());
result = deleteFiles(reachableFileDF.toLocalIterator());
} else {
return deleteFiles(reachableFileDF.collectAsList().iterator());
result = deleteFiles(reachableFileDF.collectAsList().iterator());
}
if (useCaching) {
allManifests.unpersist();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be in a finally block

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
return result;
}

private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
allManifests = loadMetadataTable(staticTable, ALL_MANIFESTS);
useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
allManifests.persist();
}
return withFileType(buildValidContentFileDF(staticTable, allManifests), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(allManifests), MANIFEST))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this change the performance? Don't we have to compute all manifests in both locations here? Or Does changing to an object let Spark cache the relation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadMetadataTable was called twice, now it is only once for ALL_MANIFESTS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or Does changing to an object let Spark cache the relation?

I thought compute of dataset will happen in the first action and the results are reused for both the steps.

Copy link
Member Author

@ajantha-bhat ajantha-bhat May 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer : I got what you meant now.

I will fix by adding persist(), it will cache and reuse it. Finally it will avoid two time reading the manifest list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed and verified the scanning by adding and checking logs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes I should have been more clear, I was referring to the fact that the dataset would be recomputed on both lines. The "loadMetadataTable" function should be very fast but the actual planning should be expensive and that would require a cache of some kind.

I'm a little worried in general about persisting things since I want to make sure we clean up our caches asap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thing to worry about here is what is the additional cost of the persist operation here. Running a persist is not free and we need to check for sure that doing this cache is cost effective. In my experience 3 uses of a persisted df is most of the time worth it, but 2 sometimes is not (Very much dependent on the computation leading up to the cached df)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very much dependent on the computation leading up to the cached df

Because it involves the IO operation, it will definitely help when there are hundreds or thousands of snapshots.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer , which is better? a cache or manually calling dataset.collect() on that allManifest df and building new dataset on top of those collected rows and reusing it on both the location?

Copy link
Member

@RussellSpitzer RussellSpitzer May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat The problem is that persisting does another IO serialization of what should be basically the same amount of information but hopefully in a more readable form. Persist by default is a Memory and Disk based cache. You really need to test it out to be sure.

For Cache vs Collect. Cache is probably much better, pieces would be stored on executors and the IO would hopefully be mostly local, doing a Collect and and building a DF from it would essentially bring all data back to the driver serialize, then deserialize and send everything back out. This is always worse than cache/persist

.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.function.Consumer;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
import org.apache.iceberg.actions.ExpireSnapshots;
Expand All @@ -46,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;

Expand Down Expand Up @@ -87,6 +87,12 @@ public void accept(String file) {
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;
private Dataset<Row> expiredFiles = null;
private Dataset<Row> allManifestsBefore = null;
private Dataset<Row> allManifestsAfter = null;
private boolean useCaching = true;

private static final String USE_CACHING = "use-caching";
private static final boolean USE_CACHING_DEFAULT = true;

public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
super(spark);
Expand Down Expand Up @@ -147,7 +153,13 @@ public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc)
public Dataset<Row> expire() {
if (expiredFiles == null) {
// fetch metadata before expiration
Dataset<Row> originalFiles = buildValidFileDF(ops.current());
Table staticTableBefore = newStaticTable(ops.current(), table.io());
allManifestsBefore = loadMetadataTable(staticTableBefore, ALL_MANIFESTS);
useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
allManifestsBefore.persist();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to do the "memory only" cache here (and in all the other usages) ... but I'm not sure

}
Dataset<Row> originalFiles = buildValidFileDF(staticTableBefore, allManifestsBefore);

// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false);
Expand All @@ -166,7 +178,12 @@ public Dataset<Row> expire() {
expireSnapshots.commit();

// fetch metadata after expiration
Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
Table staticTableAfter = newStaticTable(ops.refresh(), table.io());
allManifestsAfter = loadMetadataTable(staticTableAfter, ALL_MANIFESTS);
if (useCaching) {
allManifestsAfter.persist();
}
Dataset<Row> validFiles = buildValidFileDF(staticTableAfter, allManifestsAfter);

// determine expired files
this.expiredFiles = originalFiles.except(validFiles);
Expand Down Expand Up @@ -206,17 +223,22 @@ private String jobDesc() {

private ExpireSnapshots.Result doExecute() {
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
BaseExpireSnapshotsActionResult result;
if (streamResults) {
return deleteFiles(expire().toLocalIterator());
result = deleteFiles(expire().toLocalIterator());
} else {
return deleteFiles(expire().collectAsList().iterator());
result = deleteFiles(expire().collectAsList().iterator());
}
if (useCaching) {
allManifestsBefore.unpersist();
allManifestsAfter.unpersist();
}
return result;
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, table.io());
return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
private Dataset<Row> buildValidFileDF(Table staticTable, Dataset<Row> allManifests) {
return withFileType(buildValidContentFileDF(staticTable, allManifests), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(allManifests), MANIFEST))
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;

Expand Down Expand Up @@ -123,11 +122,11 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
}

// builds a DF of delete and data file locations by reading all manifests
protected Dataset<Row> buildValidContentFileDF(Table table) {
protected Dataset<Row> buildValidContentFileDF(Table table, Dataset<Row> allManifestsDf) {
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));

Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
Dataset<ManifestFileBean> allManifests = allManifestsDf
.selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
Expand All @@ -136,8 +135,8 @@ protected Dataset<Row> buildValidContentFileDF(Table table) {
return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH);
}

protected Dataset<Row> buildManifestFileDF(Table table) {
return loadMetadataTable(table, ALL_MANIFESTS).select(col("path").as(FILE_PATH));
protected Dataset<Row> buildManifestFileDF(Dataset<Row> allManifestsDf) {
return allManifestsDf.select(col("path").as(FILE_PATH));
}

protected Dataset<Row> buildManifestListDF(Table table) {
Expand All @@ -160,8 +159,8 @@ private Dataset<Row> buildOtherMetadataFileDF(Table table, boolean includePrevio
return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF(FILE_PATH);
}

protected Dataset<Row> buildValidMetadataFileDF(Table table) {
Dataset<Row> manifestDF = buildManifestFileDF(table);
protected Dataset<Row> buildValidMetadataFileDF(Table table, Dataset<Row> allManifests) {
Dataset<Row> manifestDF = buildManifestFileDF(allManifests);
Dataset<Row> manifestListDF = buildManifestListDF(table);
Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
};

private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
Expand Down Expand Up @@ -86,6 +87,7 @@ public InternalRow[] call(InternalRow args) {
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
Boolean useCaching = args.isNullAt(5) ? null : args.getBoolean(5);

Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
Expand All @@ -101,14 +103,18 @@ public InternalRow[] call(InternalRow args) {
action.retainLast(retainLastNum);
}

if (maxConcurrentDeletes != null && maxConcurrentDeletes > 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxConcurrentDeletes > 0 is already checked in the preconditions above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's best to leave changes like this for a separate cleanup pr

if (maxConcurrentDeletes != null) {
action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots"));
}

if (streamResult != null) {
action.option(BaseExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
}

if (useCaching != null) {
action.option("use_caching", useCaching.toString());
}

ExpireSnapshots.Result result = action.execute();

return toOutputRows(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
ProcedureParameter.optional("file_list_view", DataTypes.StringType)
ProcedureParameter.optional("file_list_view", DataTypes.StringType),
ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
};

private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
Expand Down Expand Up @@ -90,6 +91,7 @@ public InternalRow[] call(InternalRow args) {
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
String fileListView = args.isNullAt(5) ? null : args.getString(5);
Boolean useCaching = args.isNullAt(6) ? null : args.getBoolean(6);

Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
Expand All @@ -113,14 +115,18 @@ public InternalRow[] call(InternalRow args) {
action.deleteWith(file -> { });
}

if (maxConcurrentDeletes != null && maxConcurrentDeletes > 0) {
if (maxConcurrentDeletes != null) {
action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans"));
}

if (fileListView != null) {
((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView));
}

if (useCaching != null) {
action.option("use_caching", useCaching.toString());
}

DeleteOrphanFiles.Result result = action.execute();

return toOutputRows(result);
Expand Down