Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/Tasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,10 @@ public static Builder<Integer> range(int upTo) {
return new Builder<>(new Range(upTo));
}

public static <I> Builder<I> foreach(Iterator<I> items) {
return new Builder<>(() -> items);
}

public static <I> Builder<I> foreach(Iterable<I> items) {
return new Builder<>(items);
}
Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/java/org/apache/iceberg/actions/Actions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ public RewriteManifestsAction rewriteManifests() {
public RewriteDataFilesAction rewriteDataFiles() {
return new RewriteDataFilesAction(spark, table);
}

public ExpireSnapshotsAction expireSnapshots() {
return new ExpireSnapshotsAction(spark, table);
}
}
71 changes: 71 additions & 0 deletions spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@

package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

abstract class BaseAction<R> implements Action<R> {

Expand All @@ -41,4 +50,66 @@ protected String metadataTableName(MetadataTableType type) {
return tableName + "." + type;
}
}

/**
* Returns all the path locations of all Manifest Lists for a given table
* @param table the table
* @return the paths of the Manifest Lists
*/
protected List<String> getManifestListPaths(Table table) {
List<String> manifestLists = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
String manifestListLocation = snapshot.manifestListLocation();
if (manifestListLocation != null) {
manifestLists.add(manifestListLocation);
}
}
return manifestLists;
}

/**
* Returns all Metadata file paths which may not be in the current metadata. Specifically
* this includes "version-hint" files as well as entries in metadata.previousFiles.
* @param ops TableOperations for the table we will be getting paths from
* @return a list of paths to metadata files
*/
protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
List<String> otherMetadataFiles = Lists.newArrayList();
otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));

TableMetadata metadata = ops.current();
otherMetadataFiles.add(metadata.metadataFileLocation());
for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) {
otherMetadataFiles.add(previousMetadataFile.file());
}
return otherMetadataFiles;
}

protected Dataset<Row> buildValidDataFileDF(SparkSession spark) {
String allDataFilesMetadataTable = metadataTableName(MetadataTableType.ALL_DATA_FILES);
return spark.read().format("iceberg").load(allDataFilesMetadataTable).select("file_path");
}

protected Dataset<Row> buildManifestFileDF(SparkSession spark) {
String allManifestsMetadataTable = metadataTableName(MetadataTableType.ALL_MANIFESTS);
return spark.read().format("iceberg").load(allManifestsMetadataTable).selectExpr("path as file_path");
}

protected Dataset<Row> buildManifestListDF(SparkSession spark, Table table) {
List<String> manifestLists = getManifestListPaths(table);
return spark.createDataset(manifestLists, Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildOtherMetadataFileDF(SparkSession spark, TableOperations ops) {
List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops);
return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildValidMetadataFileDF(SparkSession spark, Table table, TableOperations ops) {
Dataset<Row> manifestDF = buildManifestFileDF(spark);
Dataset<Row> manifestListDF = buildManifestListDF(spark, table);
Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(spark, ops);

return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An action which performs the same operation as {@link ExpireSnapshots} but uses Spark
* to determine the delta in files between the pre and post-expiration table metadata. All of the same
* restrictions of Remove Snapshots also apply to this action.
* <p>
* This implementation uses the metadata tables for the table being expired to list all Manifest and DataFiles. This
* is made into a Dataframe which are anti-joined with the same list read after the expiration. This operation will
* require a shuffle so parallelism can be controlled through spark.sql.shuffle.partitions. The expiration is done
* locally using a direct call to RemoveSnapshots. The snapshot expiration will be fully committed before any deletes
* are issued. Deletes are still performed locally after retrieving the results from the Spark executors.
*/
public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotsActionResult> {
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class);

private static final String DATA_FILE = "Data File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";

// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final SparkSession spark;
private final Table table;
private final TableOperations ops;
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
ops.io().deleteFile(file);
}
};

private Set<Long> expireSnapshotIdValues = Sets.newHashSet();
private Long expireOlderThanValue = null;
private Integer retainLastValue = null;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;

ExpireSnapshotsAction(SparkSession spark, Table table) {
this.spark = spark;
this.table = table;
this.ops = ((HasTableOperations) table).operations();
}

@Override
protected Table table() {
return table;
}

/**
* An executor service used when deleting files. Only used during the local delete phase of this Spark action.
* Similar to {@link ExpireSnapshots#executeWith(ExecutorService)}
* @param executorService the service to use
* @return this for method chaining
*/
public ExpireSnapshotsAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
return this;
}

/**
* A specific snapshot to expire.
* Identical to {@link ExpireSnapshots#expireSnapshotId(long)}
* @param expireSnapshotId Id of the snapshot to expire
* @return this for method chaining
*/
public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
this.expireSnapshotIdValues.add(expireSnapshotId);
return this;
}

/**
* Expire all snapshots older than a given timestamp.
* Identical to {@link ExpireSnapshots#expireOlderThan(long)}
* @param timestampMillis all snapshots before this time will be expired
* @return this for method chaining
*/
public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
this.expireOlderThanValue = timestampMillis;
return this;
}

/**
* Retain at least x snapshots when expiring
* Identical to {@link ExpireSnapshots#retainLast(int)}
* @param numSnapshots number of snapshots to leave
* @return this for method chaining
*/
public ExpireSnapshotsAction retainLast(int numSnapshots) {
Preconditions.checkArgument(1 <= numSnapshots,
"Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
this.retainLastValue = numSnapshots;
return this;
}

/**
* The Consumer used on files which have been determined to be expired. By default uses a filesystem delete.
* Identical to {@link ExpireSnapshots#deleteWith(Consumer)}
* @param newDeleteFunc Consumer which takes a path and deletes it
* @return this for method chaining
*/
public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
return this;
}

@Override
public ExpireSnapshotsActionResult execute() {
Dataset<Row> originalFiles = null;
try {
// Metadata before Expiration
originalFiles = buildValidFileDF().persist();
// Action to trigger persist
originalFiles.count();

// Perform Expiration
ExpireSnapshots expireSnaps = table.expireSnapshots().cleanExpiredFiles(false);
for (final Long id : expireSnapshotIdValues) {
expireSnaps = expireSnaps.expireSnapshotId(id);
}

if (expireOlderThanValue != null) {
expireSnaps = expireSnaps.expireOlderThan(expireOlderThanValue);
}

if (retainLastValue != null) {
expireSnaps = expireSnaps.retainLast(retainLastValue);
}

expireSnaps.commit();

// Metadata after Expiration
Dataset<Row> validFiles = buildValidFileDF();
Dataset<Row> filesToDelete = originalFiles.except(validFiles);

return deleteFiles(filesToDelete.toLocalIterator());
} finally {
if (originalFiles != null) {
originalFiles.unpersist();
}
}
}

private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
return ds.select(new Column("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF() {
return appendTypeString(buildValidDataFileDF(spark), DATA_FILE)
.union(appendTypeString(buildManifestFileDF(spark), MANIFEST))
.union(appendTypeString(buildManifestListDF(spark, table), MANIFEST_LIST));
}

/**
* Deletes files passed to it based on their type.
* @param expiredFiles an Iterator of Spark Rows of the structure (path: String, type: String)
* @return Statistics on which files were deleted
*/
private ExpireSnapshotsActionResult deleteFiles(Iterator<Row> expiredFiles) {
AtomicLong dataFileCount = new AtomicLong(0L);
AtomicLong manifestCount = new AtomicLong(0L);
AtomicLong manifestListCount = new AtomicLong(0L);

Tasks.foreach(expiredFiles)
.retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
.executeWith(deleteExecutorService)
.onFailure((fileInfo, exc) ->
LOG.warn("Delete failed for {}: {}", fileInfo.getString(1), fileInfo.getString(0), exc))
.run(fileInfo -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
deleteFunc.accept(file);
switch (type) {
case DATA_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
LOG.debug("Deleted Manifest: {}", file);
break;
case MANIFEST_LIST:
manifestListCount.incrementAndGet();
LOG.debug("Deleted Manifest List: {}", file);
break;
}
});
LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get());
return new ExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class ExpireSnapshotsActionResult {

private final Long dataFilesDeleted;
private final Long manifestFilesDeleted;
private final Long manifestListsDeleted;

public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) {
this.dataFilesDeleted = dataFilesDeleted;
this.manifestFilesDeleted = manifestFilesDeleted;
this.manifestListsDeleted = manifestListsDeleted;
}

public Long dataFilesDeleted() {
return dataFilesDeleted;
}

public Long manifestFilesDeleted() {
return manifestFilesDeleted;
}

public Long manifestListsDeleted() {
return manifestListsDeleted;
}

}
Loading