Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,11 @@ default RewriteDataFiles rewriteDataFiles(Table table) {
default ExpireSnapshots expireSnapshots(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement expireSnapshots");
}

/**
* Instantiates an action to remove all the files reachable from given metadata location.
*/
default RemoveReachableFiles removeReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement removeReachableFiles");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.iceberg.io.FileIO;

/**
* An action that removes all files referenced by a table metadata file.
* <p>
* This action will irreversibly delete all reachable files such as data files, manifests,
* manifest lists and should be used to clean up the underlying storage once a table is dropped
* and no longer needed.
* <p>
* Implementations may use a query engine to distribute parts of work.
*/
public interface RemoveReachableFiles extends Action<RemoveReachableFiles, RemoveReachableFiles.Result> {

/**
* Passes an alternative delete implementation that will be used for files.
*
* @param removeFunc a function that will be called to delete files.
* The function accepts path to file as an argument.
* @return this for method chaining
*/
RemoveReachableFiles deleteWith(Consumer<String> removeFunc);

/**
* Passes an alternative executor service that will be used for files removal.
* <p>
* If this method is not called, files will be deleted in the current thread.
*
* @param executorService the service to use
* @return this for method chaining
*/
RemoveReachableFiles executeDeleteWith(ExecutorService executorService);

/**
* Set the {@link FileIO} to be used for files removal
*
* @param io FileIO to use for files removal
* @return this for method chaining
*/
RemoveReachableFiles io(FileIO io);

/**
* The action result that contains a summary of the execution.
*/
interface Result {

/**
* Returns the number of data files removed.
*/
long removedDataFilesCount();

/**
* Returns the number of manifests removed.
*/
long removedManifestsCount();

/**
* Returns the number of manifest lists removed.
*/
long removedManifestListsCount();

/**
* Returns the number of metadata json, version hint files removed.
*/
long otherRemovedFilesCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseRemoveFilesActionResult implements RemoveReachableFiles.Result {

private final long deletedDataFilesCount;
private final long deletedManifestsCount;
private final long deletedManifestListsCount;
private final long deletedOtherFilesCount;

public BaseRemoveFilesActionResult(long deletedDataFilesCount,
long deletedManifestsCount,
long deletedManifestListsCount,
long otherDeletedFilesCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
this.deletedOtherFilesCount = otherDeletedFilesCount;
}

@Override
public long removedDataFilesCount() {
return deletedDataFilesCount;
}

@Override
public long removedManifestsCount() {
return deletedManifestsCount;
}

@Override
public long removedManifestListsCount() {
return deletedManifestListsCount;
}

@Override
public long otherRemovedFilesCount() {
return deletedOtherFilesCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.actions;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.BaseRemoveFilesActionResult;
import org.apache.iceberg.actions.RemoveReachableFiles;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link RemoveReachableFiles} that uses metadata tables in Spark
* to determine which files should be deleted.
*/
@SuppressWarnings("UnnecessaryAnonymousClass")
public class BaseRemoveReachableFilesSparkAction
extends BaseSparkAction<RemoveReachableFiles, RemoveReachableFiles.Result> implements RemoveReachableFiles {
private static final Logger LOG = LoggerFactory.getLogger(BaseRemoveReachableFilesSparkAction.class);

private static final String DATA_FILE = "Data File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
private static final String OTHERS = "Others";

private static final String STREAM_RESULTS = "stream-results";

// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final String metadataLocation;
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
io.deleteFile(file);
}
};

private Consumer<String> removeFunc = defaultDelete;
private ExecutorService removeExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());

public BaseRemoveReachableFilesSparkAction(SparkSession spark, String metadataLocation) {
super(spark);
this.metadataLocation = metadataLocation;
}

@Override
protected RemoveReachableFiles self() {
return this;
}

@Override
public RemoveReachableFiles io(FileIO fileIO) {
this.io = fileIO;
return this;
}

@Override
public RemoveReachableFiles deleteWith(Consumer<String> removeFn) {
this.removeFunc = removeFn;
return this;

}

@Override
public RemoveReachableFiles executeDeleteWith(ExecutorService executorService) {
this.removeExecutorService = executorService;
return this;
}

@Override
public Result execute() {
Preconditions.checkArgument(io != null, "File IO cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine here, but could put the precondition in the "io(FileIO fileIO)" method for just a little earlier erroring

String msg = String.format("Removing files reachable from %s", metadataLocation);
JobGroupInfo info = newJobGroupInfo("REMOVE-FILES", msg);
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false);
TableMetadata metadata = TableMetadataParser.read(io, metadataLocation);
Dataset<Row> validFileDF = buildValidFileDF(metadata).distinct();
if (streamResults) {
return deleteFiles(validFileDF.toLocalIterator());
} else {
return deleteFiles(validFileDF.collectAsList().iterator());
}
}

private Dataset<Row> projectFilePathWithType(Dataset<Row> ds, String type) {
return ds.select(functions.col("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
return projectFilePathWithType(buildValidDataFileDF(staticTable), DATA_FILE)
.union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST))
.union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS));
}

@Override
protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
List<String> otherMetadataFiles = Lists.newArrayList();
otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, true));
otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
return spark().createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path");
}

/**
* Deletes files passed to it.
*
* @param deleted an Iterator of Spark Rows of the structure (path: String, type: String)
* @return Statistics on which files were deleted
*/
private BaseRemoveFilesActionResult deleteFiles(Iterator<Row> deleted) {
AtomicLong dataFileCount = new AtomicLong(0L);
AtomicLong manifestCount = new AtomicLong(0L);
AtomicLong manifestListCount = new AtomicLong(0L);
AtomicLong otherFilesCount = new AtomicLong(0L);

Tasks.foreach(deleted)
.retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
.executeWith(removeExecutorService)
.onFailure((fileInfo, exc) -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
LOG.warn("Delete failed for {}: {}", type, file, exc);
})
.run(fileInfo -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
removeFunc.accept(file);
switch (type) {
case DATA_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
LOG.debug("Deleted Manifest: {}", file);
break;
case MANIFEST_LIST:
manifestListCount.incrementAndGet();
LOG.debug("Deleted Manifest List: {}", file);
break;
case OTHERS:
otherFilesCount.incrementAndGet();
LOG.debug("Others: {}", file);
break;
}
});

long filesCount = dataFileCount.get() + manifestCount.get() + manifestListCount.get() + otherFilesCount.get();
LOG.info("Total files removed: {}", filesCount);
return new BaseRemoveFilesActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get(),
otherFilesCount.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.RemoveOrphanFiles;
import org.apache.iceberg.actions.RemoveReachableFiles;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.spark.sql.SparkSession;

Expand Down Expand Up @@ -52,4 +53,9 @@ public RewriteManifests rewriteManifests(Table table) {
public ExpireSnapshots expireSnapshots(Table table) {
return new BaseExpireSnapshotsSparkAction(spark, table);
}

@Override
public RemoveReachableFiles removeReachableFiles(String metadataLocation) {
return new BaseRemoveReachableFilesSparkAction(spark, metadataLocation);
}
}
Loading