Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions api/src/main/java/org/apache/iceberg/ReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,25 @@
package org.apache.iceberg;

/**
* Not recommended: API for overwriting files in a table by partition.
* API for overwriting files in a table by partition.
* <p>
* This is provided to implement SQL compatible with Hive table operations but is not recommended.
* Instead, use the {@link OverwriteFiles overwrite API} to explicitly overwrite data.
* <p>
* The default validation mode is idempotent, meaning the overwrite is
* correct and should be committed out regardless of other concurrent changes to the table.
* Alternatively, this API can be configured to validate that no new data or deletes
* have been applied since a snapshot ID associated when this operation began.
* This can be done by calling {@link #validateNoConflictingDeletes()}, {@link #validateNoConflictingData()},
* to ensure that no conflicting delete files or data files respectively have been written since the snapshot
* passed to {@link #validateFromSnapshot(long)}.
* <p>
* This API accumulates file additions and produces a new {@link Snapshot} of the table by replacing
* all files in partitions with new data with the new additions. This operation is used to implement
* dynamic partition replacement.
* <p>
* When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* This has no requirements for the latest snapshot and will not fail based on other snapshot
* changes.
*/
public interface ReplacePartitions extends SnapshotUpdate<ReplacePartitions> {
/**
Expand All @@ -49,4 +55,41 @@ public interface ReplacePartitions extends SnapshotUpdate<ReplacePartitions> {
* @return this for method chaining
*/
ReplacePartitions validateAppendOnly();

/**
* Set the snapshot ID used in validations for this operation.
*
* All validations will check changes after this snapshot ID. If this is not called, validation will occur
* from the beginning of the table's history.
*
* This method should be called before this operation is committed.
* If a concurrent operation committed a data or delta file or removed a data file after the given snapshot ID
* that might contain rows matching a partition marked for deletion, validation will detect this and fail.
*
* @param snapshotId a snapshot ID, it should be set to when this operation started to read the table.
* @return this for method chaining
*/
ReplacePartitions validateFromSnapshot(long snapshotId);

/**
* Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
* <p>
* Validating concurrent deletes is required during non-idempotent replace partition operations.
* This will check if a concurrent operation deletes data in any of the partitions being overwritten,
* as the replace partition must be aborted to avoid undeleting rows that were removed concurrently.
*
* @return this for method chaining
*/
ReplacePartitions validateNoConflictingDeletes();

/**
* Enables validation that data added concurrently does not conflict with this commit's operation.
* <p>
* Validating concurrent data files is required during non-idempotent replace partition operations.
* This will check if a concurrent operation inserts data in any of the partitions being overwritten,
* as the replace partition must be aborted to avoid removing rows added concurrently.
*
* @return this for method chaining
*/
ReplacePartitions validateNoConflictingData();
}
48 changes: 48 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
import java.util.List;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.util.PartitionSet;

public class BaseReplacePartitions
extends MergingSnapshotProducer<ReplacePartitions> implements ReplacePartitions {

private final PartitionSet replacedPartitions;
private Long startingSnapshotId;
private boolean validateConflictingData = false;
private boolean validateConflictingDeletes = false;

BaseReplacePartitions(String tableName, TableOperations ops) {
super(tableName, ops);
set(SnapshotSummary.REPLACE_PARTITIONS_PROP, "true");
replacedPartitions = PartitionSet.create(ops.current().specsById());
}

@Override
Expand All @@ -43,6 +51,7 @@ protected String operation() {
@Override
public ReplacePartitions addFile(DataFile file) {
dropPartition(file.specId(), file.partition());
replacedPartitions.add(file.specId(), file.partition());
add(file);
return this;
}
Expand All @@ -53,6 +62,45 @@ public ReplacePartitions validateAppendOnly() {
return this;
}

@Override
public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
this.startingSnapshotId = newStartingSnapshotId;
return this;
}

@Override
public ReplacePartitions validateNoConflictingDeletes() {
this.validateConflictingDeletes = true;
return this;
}

@Override
public ReplacePartitions validateNoConflictingData() {
this.validateConflictingData = true;
return this;
}

@Override
public void validate(TableMetadata currentMetadata) {
if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
} else {
validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions);
}
}

if (validateConflictingDeletes) {
if (dataSpec().isUnpartitioned()) {
validateDeletedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
} else {
validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions);
}
}
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
if (dataSpec().fields().size() <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR. It seems the implementation behaves differently when the current spec is unpartitioned. If we have multiple specs and the current one is partitioned, it will only replace partitions in the current spec without touching old specs. If the current spec is unpartitioned, though, it will replace everything.

Is that expected, @rdblue?

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.Tasks;

Expand Down Expand Up @@ -331,6 +332,7 @@ static class Builder {
private Map<Integer, PartitionSpec> specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
private Expression partitionFilter = Expressions.alwaysTrue();
private PartitionSet partitionSet = null;
private boolean caseSensitive = true;
private ExecutorService executorService = null;

Expand Down Expand Up @@ -359,6 +361,11 @@ Builder filterPartitions(Expression newPartitionFilter) {
return this;
}

Builder filterPartitions(PartitionSet newPartitionSet) {
this.partitionSet = newPartitionSet;
return this;
}

Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
Expand Down Expand Up @@ -471,6 +478,7 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea
ManifestFiles.readDeleteManifest(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.filterPartitions(partitionSet)
.caseSensitive(caseSensitive)
.liveEntries()
);
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionSet;

import static org.apache.iceberg.expressions.Expressions.alwaysTrue;

Expand Down Expand Up @@ -80,6 +81,7 @@ private String fileClass() {
private final Schema fileSchema;

// updated by configuration methods
private PartitionSet partitionSet = null;
private Expression partFilter = alwaysTrue();
private Expression rowFilter = alwaysTrue();
private Schema fileProjection = null;
Expand Down Expand Up @@ -158,6 +160,11 @@ public ManifestReader<F> filterPartitions(Expression expr) {
return this;
}

public ManifestReader<F> filterPartitions(PartitionSet partitions) {
this.partitionSet = partitions;
return this;
}

public ManifestReader<F> filterRows(Expression expr) {
this.rowFilter = Expressions.and(rowFilter, expr);
return this;
Expand All @@ -170,7 +177,8 @@ public ManifestReader<F> caseSensitive(boolean isCaseSensitive) {

CloseableIterable<ManifestEntry<F>> entries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
(partFilter != null && partFilter != Expressions.alwaysTrue()) ||
(partitionSet != null)) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

Expand All @@ -182,12 +190,17 @@ CloseableIterable<ManifestEntry<F>> entries() {
open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));
metricsEvaluator.eval(entry.file()) &&
inPartitionSet(entry.file()));
} else {
return open(projection(fileSchema, fileProjection, columns, caseSensitive));
}
}

private boolean inPartitionSet(F fileToCheck) {
return partitionSet == null || partitionSet.contains(fileToCheck.specId(), fileToCheck.partition());
}

private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
Expand Down
Loading