diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java new file mode 100644 index 000000000000..5b10461b1bf4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result { + + private final long deletedDataFilesCount; + private final long deletedManifestsCount; + private final long deletedManifestListsCount; + + public BaseExpireSnapshotsActionResult(long deletedDataFilesCount, + long deletedManifestsCount, + long deletedManifestListsCount) { + this.deletedDataFilesCount = deletedDataFilesCount; + this.deletedManifestsCount = deletedManifestsCount; + this.deletedManifestListsCount = deletedManifestListsCount; + } + + @Override + public long deletedDataFilesCount() { + return deletedDataFilesCount; + } + + @Override + public long deletedManifestsCount() { + return deletedManifestsCount; + } + + @Override + public long deletedManifestListsCount() { + return deletedManifestListsCount; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java b/spark/src/main/java/org/apache/iceberg/actions/Actions.java index 8c39e698f260..0610fe9e9008 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java +++ b/spark/src/main/java/org/apache/iceberg/actions/Actions.java @@ -22,6 +22,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction; import org.apache.spark.sql.SparkSession; public class Actions { @@ -79,7 +80,8 @@ public RewriteDataFilesAction rewriteDataFiles() { } public ExpireSnapshotsAction expireSnapshots() { - return new ExpireSnapshotsAction(spark, table); + BaseExpireSnapshotsSparkAction delegate = new BaseExpireSnapshotsSparkAction(spark, table); + return new ExpireSnapshotsAction(delegate); } /** diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateSparkAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateSparkAction.java index c69e1bf0ecaf..7545802ccbf5 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateSparkAction.java @@ -32,8 +32,6 @@ protected BaseSnapshotUpdateSparkAction(SparkSession spark) { super(spark); } - protected abstract ThisT self(); - @Override public ThisT snapshotProperty(String property, String value) { summary.put(property, value); diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java index adcc9159e735..01d05ff2137a 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.iceberg.BaseTable; @@ -36,6 +37,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.JobGroupUtils; import org.apache.iceberg.spark.SparkUtil; @@ -51,12 +53,13 @@ import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; -abstract class BaseSparkAction implements Action { +public abstract class BaseSparkAction implements Action { private static final AtomicInteger JOB_COUNTER = new AtomicInteger(); private final SparkSession spark; private final JavaSparkContext sparkContext; + private final Map options = Maps.newHashMap(); protected BaseSparkAction(SparkSession spark) { this.spark = spark; @@ -71,6 +74,24 @@ protected JavaSparkContext sparkContext() { return sparkContext; } + protected abstract ThisT self(); + + @Override + public ThisT option(String name, String value) { + options.put(name, value); + return self(); + } + + @Override + public ThisT options(Map newOptions) { + options.putAll(newOptions); + return self(); + } + + protected Map options() { + return options; + } + protected T withJobGroupInfo(JobGroupInfo info, Supplier supplier) { SparkContext context = spark().sparkContext(); JobGroupInfo previousInfo = JobGroupUtils.getJobGroupInfo(context); diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java index 89dbd24988d1..fb8ce47084d6 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java @@ -19,36 +19,15 @@ package org.apache.iceberg.actions; -import java.util.Iterator; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.apache.iceberg.ExpireSnapshots; -import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.JobGroupInfo; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.Tasks; -import org.apache.spark.sql.Column; +import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.functions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.iceberg.TableProperties.GC_ENABLED; -import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; /** - * An action which performs the same operation as {@link ExpireSnapshots} but uses Spark + * An action which performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark * to determine the delta in files between the pre and post-expiration table metadata. All of the same * restrictions of Remove Snapshots also apply to this action. *

@@ -57,44 +36,15 @@ * require a shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done * locally using a direct call to RemoveSnapshots. The snapshot expiration will be fully committed before any deletes * are issued. Deletes are still performed locally after retrieving the results from the Spark executors. + * + * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link BaseExpireSnapshotsSparkAction} instead. */ -@SuppressWarnings("UnnecessaryAnonymousClass") -public class ExpireSnapshotsAction extends BaseSparkAction { - private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); - - private static final String DATA_FILE = "Data File"; - private static final String MANIFEST = "Manifest"; - private static final String MANIFEST_LIST = "Manifest List"; - - // Creates an executor service that runs each task in the thread that invokes execute/submit. - private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null; - - private final Table table; - private final TableOperations ops; - private final Consumer defaultDelete = new Consumer() { - @Override - public void accept(String file) { - ops.io().deleteFile(file); - } - }; - - private Set expireSnapshotIdValues = Sets.newHashSet(); - private Long expireOlderThanValue = null; - private Integer retainLastValue = null; - private Consumer deleteFunc = defaultDelete; - private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; - private Dataset expiredFiles = null; - private boolean streamResults = false; - - ExpireSnapshotsAction(SparkSession spark, Table table) { - super(spark); +@Deprecated +public class ExpireSnapshotsAction implements Action { + private final BaseExpireSnapshotsSparkAction delegate; - this.table = table; - this.ops = ((HasTableOperations) table).operations(); - - ValidationException.check( - PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT), - "Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)"); + ExpireSnapshotsAction(BaseExpireSnapshotsSparkAction delegate) { + this.delegate = delegate; } /** @@ -105,7 +55,7 @@ public void accept(String file) { * @return this for method chaining */ public ExpireSnapshotsAction streamDeleteResults(boolean stream) { - this.streamResults = stream; + delegate.option("stream-results", Boolean.toString(stream)); return this; } @@ -116,7 +66,7 @@ public ExpireSnapshotsAction streamDeleteResults(boolean stream) { * @return this for method chaining */ public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) { - this.deleteExecutorService = executorService; + delegate.executeDeleteWith(executorService); return this; } @@ -127,7 +77,7 @@ public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) * @return this for method chaining */ public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { - this.expireSnapshotIdValues.add(expireSnapshotId); + delegate.expireSnapshotId(expireSnapshotId); return this; } @@ -138,7 +88,7 @@ public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) { * @return this for method chaining */ public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { - this.expireOlderThanValue = timestampMillis; + delegate.expireOlderThan(timestampMillis); return this; } @@ -149,9 +99,7 @@ public ExpireSnapshotsAction expireOlderThan(long timestampMillis) { * @return this for method chaining */ public ExpireSnapshotsAction retainLast(int numSnapshots) { - Preconditions.checkArgument(1 <= numSnapshots, - "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots); - this.retainLastValue = numSnapshots; + delegate.retainLast(numSnapshots); return this; } @@ -162,7 +110,7 @@ public ExpireSnapshotsAction retainLast(int numSnapshots) { * @return this for method chaining */ public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { - this.deleteFunc = newDeleteFunc; + delegate.deleteWith(newDeleteFunc); return this; } @@ -176,95 +124,12 @@ public ExpireSnapshotsAction deleteWith(Consumer newDeleteFunc) { * @return a Dataset of files that are no longer referenced by the table */ public Dataset expire() { - if (expiredFiles == null) { - // Metadata before Expiration - Dataset originalFiles = buildValidFileDF(ops.current()); - - // Perform Expiration - ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false); - for (final Long id : expireSnapshotIdValues) { - expireSnaps = expireSnaps.expireSnapshotId(id); - } - - if (expireOlderThanValue != null) { - expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue); - } - - if (retainLastValue != null) { - expireSnaps = expireSnaps.retainLast(retainLastValue); - } - - expireSnaps.commit(); - - // Metadata after Expiration - Dataset validFiles = buildValidFileDF(ops.refresh()); - - this.expiredFiles = originalFiles.except(validFiles); - } - - return expiredFiles; + return delegate.expire(); } @Override public ExpireSnapshotsActionResult execute() { - JobGroupInfo info = newJobGroupInfo("EXPIRE", "EXPIRE-SNAPSHOTS"); - return withJobGroupInfo(info, this::doExecute); - } - - private ExpireSnapshotsActionResult doExecute() { - if (streamResults) { - return deleteFiles(expire().toLocalIterator()); - } else { - return deleteFiles(expire().collectAsList().iterator()); - } - } - - private Dataset appendTypeString(Dataset ds, String type) { - return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); - } - - private Dataset buildValidFileDF(TableMetadata metadata) { - Table staticTable = newStaticTable(metadata, table.io()); - return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE) - .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST)) - .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)); - } - - /** - * Deletes files passed to it based on their type. - * @param expired an Iterator of Spark Rows of the structure (path: String, type: String) - * @return Statistics on which files were deleted - */ - private ExpireSnapshotsActionResult deleteFiles(Iterator expired) { - AtomicLong dataFileCount = new AtomicLong(0L); - AtomicLong manifestCount = new AtomicLong(0L); - AtomicLong manifestListCount = new AtomicLong(0L); - - Tasks.foreach(expired) - .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished() - .executeWith(deleteExecutorService) - .onFailure((fileInfo, exc) -> - LOG.warn("Delete failed for {}: {}", fileInfo.getString(1), fileInfo.getString(0), exc)) - .run(fileInfo -> { - String file = fileInfo.getString(0); - String type = fileInfo.getString(1); - deleteFunc.accept(file); - switch (type) { - case DATA_FILE: - dataFileCount.incrementAndGet(); - LOG.trace("Deleted Data File: {}", file); - break; - case MANIFEST: - manifestCount.incrementAndGet(); - LOG.debug("Deleted Manifest: {}", file); - break; - case MANIFEST_LIST: - manifestListCount.incrementAndGet(); - LOG.debug("Deleted Manifest List: {}", file); - break; - } - }); - LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); - return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); + org.apache.iceberg.actions.ExpireSnapshots.Result result = delegate.execute(); + return ExpireSnapshotsActionResult.wrap(result); } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java index adb74bcdfc96..8473e5042ecb 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java @@ -19,12 +19,20 @@ package org.apache.iceberg.actions; +@Deprecated public class ExpireSnapshotsActionResult { private final Long dataFilesDeleted; private final Long manifestFilesDeleted; private final Long manifestListsDeleted; + static ExpireSnapshotsActionResult wrap(ExpireSnapshots.Result result) { + return new ExpireSnapshotsActionResult( + result.deletedDataFilesCount(), + result.deletedManifestsCount(), + result.deletedManifestListsCount()); + } + public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) { this.dataFilesDeleted = dataFilesDeleted; this.manifestFilesDeleted = manifestFilesDeleted; diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java index 778030c2e2d3..2948e564b9d8 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java @@ -111,6 +111,11 @@ public void accept(String file) { "Cannot remove orphan files: GC is disabled (deleting files may corrupt other tables)"); } + @Override + protected RemoveOrphanFilesAction self() { + return this; + } + /** * Removes orphan files in the given location. * diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java new file mode 100644 index 000000000000..e85972e495e2 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult; +import org.apache.iceberg.actions.BaseSparkAction; +import org.apache.iceberg.actions.ExpireSnapshots; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.GC_ENABLED; +import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; + +/** + * An action that performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark + * to determine the delta in files between the pre and post-expiration table metadata. All of the same + * restrictions of {@link org.apache.iceberg.ExpireSnapshots} also apply to this action. + *

+ * This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and then + * uses metadata tables to find files that can be safely deleted. This is done by anti-joining two Datasets + * that contain all manifest and data files before and after the expiration. The snapshot expiration + * will be fully committed before any deletes are issued. + *

+ * This operation performs a shuffle so the parallelism can be controlled through 'spark.sql.shuffle.partitions'. + *

+ * Deletes are still performed locally after retrieving the results from the Spark executors. + */ +@SuppressWarnings("UnnecessaryAnonymousClass") +public class BaseExpireSnapshotsSparkAction + extends BaseSparkAction implements ExpireSnapshots { + private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class); + + private static final String DATA_FILE = "Data File"; + private static final String MANIFEST = "Manifest"; + private static final String MANIFEST_LIST = "Manifest List"; + + private static final String STREAM_RESULTS = "stream-results"; + + // Creates an executor service that runs each task in the thread that invokes execute/submit. + private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null; + + private final Table table; + private final TableOperations ops; + private final Consumer defaultDelete = new Consumer() { + @Override + public void accept(String file) { + ops.io().deleteFile(file); + } + }; + + private final Set expiredSnapshotIds = Sets.newHashSet(); + private Long expireOlderThanValue = null; + private Integer retainLastValue = null; + private Consumer deleteFunc = defaultDelete; + private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; + private Dataset expiredFiles = null; + + public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.ops = ((HasTableOperations) table).operations(); + + ValidationException.check( + PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT), + "Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)"); + } + + @Override + protected ExpireSnapshots self() { + return this; + } + + @Override + public BaseExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) { + this.deleteExecutorService = executorService; + return this; + } + + @Override + public BaseExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) { + expiredSnapshotIds.add(snapshotId); + return this; + } + + @Override + public BaseExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) { + this.expireOlderThanValue = timestampMillis; + return this; + } + + @Override + public BaseExpireSnapshotsSparkAction retainLast(int numSnapshots) { + Preconditions.checkArgument(1 <= numSnapshots, + "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots); + this.retainLastValue = numSnapshots; + return this; + } + + @Override + public BaseExpireSnapshotsSparkAction deleteWith(Consumer newDeleteFunc) { + this.deleteFunc = newDeleteFunc; + return this; + } + + /** + * Expires snapshots and commits the changes to the table, returning a Dataset of files to delete. + *

+ * This does not delete data files. To delete data files, run {@link #execute()}. + *

+ * This may be called before or after {@link #execute()} is called to return the expired file list. + * + * @return a Dataset of files that are no longer referenced by the table + */ + public Dataset expire() { + if (expiredFiles == null) { + // fetch metadata before expiration + Dataset originalFiles = buildValidFileDF(ops.current()); + + // perform expiration + org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false); + for (long id : expiredSnapshotIds) { + expireSnapshots = expireSnapshots.expireSnapshotId(id); + } + + if (expireOlderThanValue != null) { + expireSnapshots = expireSnapshots.expireOlderThan(expireOlderThanValue); + } + + if (retainLastValue != null) { + expireSnapshots = expireSnapshots.retainLast(retainLastValue); + } + + expireSnapshots.commit(); + + // fetch metadata after expiration + Dataset validFiles = buildValidFileDF(ops.refresh()); + + // determine expired files + this.expiredFiles = originalFiles.except(validFiles); + } + + return expiredFiles; + } + + @Override + public ExpireSnapshots.Result execute() { + JobGroupInfo info = newJobGroupInfo("EXPIRE-SNAPSHOTS", "EXPIRE-SNAPSHOTS"); + return withJobGroupInfo(info, this::doExecute); + } + + private ExpireSnapshots.Result doExecute() { + boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false); + if (streamResults) { + return deleteFiles(expire().toLocalIterator()); + } else { + return deleteFiles(expire().collectAsList().iterator()); + } + } + + private Dataset appendTypeString(Dataset ds, String type) { + return ds.select(new Column("file_path"), functions.lit(type).as("file_type")); + } + + private Dataset buildValidFileDF(TableMetadata metadata) { + Table staticTable = newStaticTable(metadata, this.table.io()); + return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE) + .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST)) + .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)); + } + + /** + * Deletes files passed to it based on their type. + * + * @param expired an Iterator of Spark Rows of the structure (path: String, type: String) + * @return Statistics on which files were deleted + */ + private BaseExpireSnapshotsActionResult deleteFiles(Iterator expired) { + AtomicLong dataFileCount = new AtomicLong(0L); + AtomicLong manifestCount = new AtomicLong(0L); + AtomicLong manifestListCount = new AtomicLong(0L); + + Tasks.foreach(expired) + .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished() + .executeWith(deleteExecutorService) + .onFailure((fileInfo, exc) -> { + String file = fileInfo.getString(0); + String type = fileInfo.getString(1); + LOG.warn("Delete failed for {}: {}", type, file, exc); + }) + .run(fileInfo -> { + String file = fileInfo.getString(0); + String type = fileInfo.getString(1); + deleteFunc.accept(file); + switch (type) { + case DATA_FILE: + dataFileCount.incrementAndGet(); + LOG.trace("Deleted Data File: {}", file); + break; + case MANIFEST: + manifestCount.incrementAndGet(); + LOG.debug("Deleted Manifest: {}", file); + break; + case MANIFEST_LIST: + manifestListCount.incrementAndGet(); + LOG.debug("Deleted Manifest List: {}", file); + break; + } + }); + + LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get()); + return new BaseExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get()); + } +}