Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class SparkContentFile<F> implements ContentFile<F> {
private final int keyMetadataPosition;
private final int splitOffsetsPosition;
private final int sortOrderIdPosition;
private final int fileSpecIdPosition;
private final int equalityIdsPosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
Expand Down Expand Up @@ -100,6 +101,7 @@ public abstract class SparkContentFile<F> implements ContentFile<F> {
this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
}

Expand All @@ -120,7 +122,10 @@ public Long pos() {

@Override
public int specId() {
return -1;
if (wrapped.isNullAt(fileSpecIdPosition)) {
return -1;
}
return wrapped.getAs(fileSpecIdPosition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.actions;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.min;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ImmutableRemoveDanglingDeleteFiles;
import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An action that removes dangling delete files from the current snapshot. A delete file is dangling
* if its deletes no longer applies to any live data files.
*
* <p>The following dangling delete files are removed:
*
* <ul>
* <li>Position delete files with a data sequence number less than that of any data file in the
* same partition
* <li>Equality delete files with a data sequence number less than or equal to that of any data
* file in the same partition
* </ul>
*/
class RemoveDanglingDeletesSparkAction
extends BaseSnapshotUpdateSparkAction<RemoveDanglingDeletesSparkAction>
implements RemoveDanglingDeleteFiles {
private static final Logger LOG = LoggerFactory.getLogger(RemoveDanglingDeletesSparkAction.class);
private final Table table;

protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
}

@Override
protected RemoveDanglingDeletesSparkAction self() {
return this;
}

public Result execute() {
if (table.specs().size() == 1 && table.spec().isUnpartitioned()) {
// ManifestFilterManager already performs this table-wide delete on each commit
return ImmutableRemoveDanglingDeleteFiles.Result.builder()
.removedDeleteFiles(Collections.emptyList())
.build();
}
String desc = String.format("Removing dangling delete files in %s", table.name());
JobGroupInfo info = newJobGroupInfo("REMOVE-DELETES", desc);
return withJobGroupInfo(info, this::doExecute);
}

Result doExecute() {
RewriteFiles rewriteFiles = table.newRewrite();
List<DeleteFile> danglingDeletes = findDanglingDeletes();
for (DeleteFile deleteFile : danglingDeletes) {
LOG.debug("Removing dangling delete file {}", deleteFile.path());
rewriteFiles.deleteFile(deleteFile);
}
if (!danglingDeletes.isEmpty()) {
commit(rewriteFiles);
}
return ImmutableRemoveDanglingDeleteFiles.Result.builder()
.removedDeleteFiles(danglingDeletes)
.build();
}

/**
* Dangling delete files can be identified with following steps
*
* <ol>
* <li>Group data files by partition keys and find the minimum data sequence number in each
* group.
* <li>Left outer join delete files with partition-grouped data files on partition keys.
* <li>Find dangling deletes by comparing each delete file's sequence number to its partition's
* minimum data sequence number.
* <li>Collect results row to driver and use {@link SparkDeleteFile SparkDeleteFile} to wrap
* rows to valid delete files
* </ol>
*/
private List<DeleteFile> findDanglingDeletes() {
Dataset<Row> minSequenceNumberByPartition =
loadMetadataTable(table, MetadataTableType.ENTRIES)
// find live data files
.filter("data_file.content == 0 AND status < 2")
.selectExpr(
"data_file.partition as partition",
"data_file.spec_id as spec_id",
"sequence_number")
.groupBy("partition", "spec_id")
.agg(min("sequence_number"))
.toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number");
Dataset<Row> deleteEntries =
loadMetadataTable(table, MetadataTableType.ENTRIES)
// find live delete files
.filter("data_file.content != 0 AND status < 2");
Column joinOnPartition =
deleteEntries
.col("data_file.spec_id")
.equalTo(minSequenceNumberByPartition.col("grouped_spec_id"))
.and(
deleteEntries
.col("data_file.partition")
.equalTo(minSequenceNumberByPartition.col("grouped_partition")));
Column filterOnDanglingDeletes =
col("min_data_sequence_number")
// delete fies without any data files in partition
.isNull()
// position delete files without any applicable data files in partition
.or(
col("data_file.content")
.equalTo("1")
.and(col("sequence_number").$less(col("min_data_sequence_number"))))
// equality delete files without any applicable data files in the partition
.or(
col("data_file.content")
.equalTo("2")
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));
Dataset<Row> danglingDeletes =
deleteEntries
.join(minSequenceNumberByPartition, joinOnPartition, "left")
.filter(filterOnDanglingDeletes)
.select("data_file.*");
return danglingDeletes.collectAsList().stream()
// map on driver because SparkDeleteFile is not serializable
.map(row -> deleteFileWrapper(danglingDeletes.schema(), row))
.collect(Collectors.toList());
}

private DeleteFile deleteFileWrapper(StructType sparkFileType, Row row) {
int specId = row.getInt(row.fieldIndex("spec_id"));
Types.StructType combinedFileType = DataFile.getType(Partitioning.partitionType(table));
// Set correct spec id
Types.StructType projection = DataFile.getType(table.specs().get(specId).partitionType());
return new SparkDeleteFile(combinedFileType, projection, sparkFileType).wrap(row);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.FileRewriter;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles.Result.Builder;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
Expand All @@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
Expand Down Expand Up @@ -82,7 +84,8 @@ public class RewriteDataFilesSparkAction
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER,
OUTPUT_SPEC_ID);
OUTPUT_SPEC_ID,
REMOVE_DANGLING_DELETES);

private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
Expand All @@ -93,6 +96,7 @@ public class RewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private FileRewriter<FileScanTask, DataFile> rewriter = null;
Expand Down Expand Up @@ -173,11 +177,17 @@ public RewriteDataFiles.Result execute() {

Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);

if (partialProgressEnabled) {
return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
} else {
return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
Builder resultBuilder =
partialProgressEnabled
? doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId))
: doExecute(ctx, groupStream, commitManager(startingSnapshotId));
if (removeDanglingDeletes) {
RemoveDanglingDeletesSparkAction action =
new RemoveDanglingDeletesSparkAction(spark(), table);
int removedCount = Iterables.size(action.execute().removedDeleteFiles());
resultBuilder.removedDeleteFilesCount(removedCount);
}
return resultBuilder.build();
}

StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
Expand Down Expand Up @@ -261,7 +271,7 @@ RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
}

private Result doExecute(
private Builder doExecute(
RewriteExecutionContext ctx,
Stream<RewriteFileGroup> groupStream,
RewriteDataFilesCommitManager commitManager) {
Expand Down Expand Up @@ -323,10 +333,10 @@ private Result doExecute(

List<FileGroupRewriteResult> rewriteResults =
rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
}

private Result doExecuteWithPartialProgress(
private Builder doExecuteWithPartialProgress(
RewriteExecutionContext ctx,
Stream<RewriteFileGroup> groupStream,
RewriteDataFilesCommitManager commitManager) {
Expand Down Expand Up @@ -372,8 +382,7 @@ private Result doExecuteWithPartialProgress(
commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
return ImmutableRewriteDataFiles.Result.builder()
.rewriteResults(rewriteResults)
.rewriteFailures(rewriteFailures)
.build();
.rewriteFailures(rewriteFailures);
}

Stream<RewriteFileGroup> toGroupStream(
Expand Down Expand Up @@ -435,6 +444,10 @@ void validateAndInitOptions() {
PropertyUtil.propertyAsBoolean(
options(), USE_STARTING_SEQUENCE_NUMBER, USE_STARTING_SEQUENCE_NUMBER_DEFAULT);

removeDanglingDeletes =
PropertyUtil.propertyAsBoolean(
options(), REMOVE_DANGLING_DELETES, REMOVE_DANGLING_DELETES_DEFAULT);

rewriteJobOrder =
RewriteJobOrder.fromName(
PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, REWRITE_JOB_ORDER_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ComputeTableStats;
import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -102,4 +103,9 @@ public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table)
public ComputeTableStats computeTableStats(Table table) {
return new ComputeTableStatsSparkAction(spark, table);
}

@Override
public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
return new RemoveDanglingDeletesSparkAction(spark, table);
}
}
Loading