Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.function.Consumer;

/**
* An action that deletes orphan files in a table.
* An action that deletes orphan metadata, data and delete files in a table.
* <p>
* A metadata or data file is considered orphan if it is not reachable by any valid snapshot.
* A file is considered orphan if it is not reachable by any valid snapshot.
* The set of actual files is built by listing the underlying storage which makes this operation
* expensive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public interface ExpireSnapshots extends Action<ExpireSnapshots, ExpireSnapshots
ExpireSnapshots retainLast(int numSnapshots);

/**
* Passes an alternative delete implementation that will be used for manifests and data files.
* Passes an alternative delete implementation that will be used for manifests, data and delete files.
* <p>
* Manifest files that are no longer used by valid snapshots will be deleted. Data files that were
* deleted by snapshots that are expired will be deleted.
* Manifest files that are no longer used by valid snapshots will be deleted. Content files that were
* marked as logically deleted by snapshots that are expired will be deleted as well.
* <p>
* If this method is not called, unnecessary manifests and data files will still be deleted.
* If this method is not called, unnecessary manifests and content files will still be deleted.
* <p>
* Identical to {@link org.apache.iceberg.ExpireSnapshots#deleteWith(Consumer)}
*
Expand All @@ -80,9 +80,9 @@ public interface ExpireSnapshots extends Action<ExpireSnapshots, ExpireSnapshots
ExpireSnapshots deleteWith(Consumer<String> deleteFunc);

/**
* Passes an alternative executor service that will be used for manifests and data files deletion.
* Passes an alternative executor service that will be used for manifests, data and delete files deletion.
* <p>
* If this method is not called, unnecessary manifests and data files will still be deleted in
* If this method is not called, unnecessary manifests and content files will still be deleted in
* the current thread.
* <p>
* Identical to {@link org.apache.iceberg.ExpireSnapshots#executeDeleteWith(ExecutorService)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;

/**
* An action that removes orphan metadata and data files by listing a given location and comparing
* the actual files in that location with data and metadata files referenced by all valid snapshots.
* An action that removes orphan metadata, data and delete files by listing a given location and comparing
* the actual files in that location with content and metadata files referenced by all valid snapshots.
* The location must be accessible for listing via the Hadoop {@link FileSystem}.
* <p>
* By default, this action cleans up the table location returned by {@link Table#location()} and
Expand Down Expand Up @@ -162,9 +162,9 @@ private String jobDesc() {
}

private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validDataFileDF = buildValidDataFileDF(table);
Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();

Column actualFileName = filenameUDF.apply(actualFileDF.col("file_path"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class BaseDeleteReachableFilesSparkAction
extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);

private static final String DATA_FILE = "Data File";
private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
private static final String OTHERS = "Others";
Expand Down Expand Up @@ -138,7 +138,7 @@ private Dataset<Row> projectFilePathWithType(Dataset<Row> ds, String type) {

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
return projectFilePathWithType(buildValidDataFileDF(staticTable), DATA_FILE)
return projectFilePathWithType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST))
.union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS));
Expand Down Expand Up @@ -177,9 +177,9 @@ private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row> deleted)
String type = fileInfo.getString(1);
removeFunc.accept(file);
switch (type) {
case DATA_FILE:
case CONTENT_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
LOG.trace("Deleted Content File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* <p>
* This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and then
* uses metadata tables to find files that can be safely deleted. This is done by anti-joining two Datasets
* that contain all manifest and data files before and after the expiration. The snapshot expiration
* that contain all manifest and content files before and after the expiration. The snapshot expiration
* will be fully committed before any deletes are issued.
* <p>
* This operation performs a shuffle so the parallelism can be controlled through 'spark.sql.shuffle.partitions'.
Expand All @@ -70,7 +70,7 @@ public class BaseExpireSnapshotsSparkAction
extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);

private static final String DATA_FILE = "Data File";
private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";

Expand Down Expand Up @@ -226,7 +226,7 @@ private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, this.table.io());
return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE)
return appendTypeString(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
}
Expand Down Expand Up @@ -255,9 +255,9 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
String type = fileInfo.getString(1);
deleteFunc.accept(file);
switch (type) {
case DATA_FILE:
case CONTENT_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
LOG.trace("Deleted Content File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}

protected Dataset<Row> buildValidDataFileDF(Table table) {
// builds a DF of delete and data file locations by reading all manifests
protected Dataset<Row> buildValidContentFileDF(Table table) {
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));

Expand Down