diff --git a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
index 2d87ebfbda25..fdf7ea1d1984 100644
--- a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
+++ b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java
@@ -20,19 +20,25 @@
package org.apache.iceberg;
/**
- * Not recommended: API for overwriting files in a table by partition.
+ * API for overwriting files in a table by partition.
*
* This is provided to implement SQL compatible with Hive table operations but is not recommended.
* Instead, use the {@link OverwriteFiles overwrite API} to explicitly overwrite data.
*
+ * The default validation mode is idempotent, meaning the overwrite is
+ * correct and should be committed out regardless of other concurrent changes to the table.
+ * Alternatively, this API can be configured to validate that no new data or deletes
+ * have been applied since a snapshot ID associated when this operation began.
+ * This can be done by calling {@link #validateNoConflictingDeletes()}, {@link #validateNoConflictingData()},
+ * to ensure that no conflicting delete files or data files respectively have been written since the snapshot
+ * passed to {@link #validateFromSnapshot(long)}.
+ *
* This API accumulates file additions and produces a new {@link Snapshot} of the table by replacing
* all files in partitions with new data with the new additions. This operation is used to implement
* dynamic partition replacement.
*
* When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
- * This has no requirements for the latest snapshot and will not fail based on other snapshot
- * changes.
*/
public interface ReplacePartitions extends SnapshotUpdate {
/**
@@ -49,4 +55,41 @@ public interface ReplacePartitions extends SnapshotUpdate {
* @return this for method chaining
*/
ReplacePartitions validateAppendOnly();
+
+ /**
+ * Set the snapshot ID used in validations for this operation.
+ *
+ * All validations will check changes after this snapshot ID. If this is not called, validation will occur
+ * from the beginning of the table's history.
+ *
+ * This method should be called before this operation is committed.
+ * If a concurrent operation committed a data or delta file or removed a data file after the given snapshot ID
+ * that might contain rows matching a partition marked for deletion, validation will detect this and fail.
+ *
+ * @param snapshotId a snapshot ID, it should be set to when this operation started to read the table.
+ * @return this for method chaining
+ */
+ ReplacePartitions validateFromSnapshot(long snapshotId);
+
+ /**
+ * Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
+ *
+ * Validating concurrent deletes is required during non-idempotent replace partition operations.
+ * This will check if a concurrent operation deletes data in any of the partitions being overwritten,
+ * as the replace partition must be aborted to avoid undeleting rows that were removed concurrently.
+ *
+ * @return this for method chaining
+ */
+ ReplacePartitions validateNoConflictingDeletes();
+
+ /**
+ * Enables validation that data added concurrently does not conflict with this commit's operation.
+ *
+ * Validating concurrent data files is required during non-idempotent replace partition operations.
+ * This will check if a concurrent operation inserts data in any of the partitions being overwritten,
+ * as the replace partition must be aborted to avoid removing rows added concurrently.
+ *
+ * @return this for method chaining
+ */
+ ReplacePartitions validateNoConflictingData();
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
index 826bc2ae94ec..fdf8d2580831 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
@@ -22,12 +22,20 @@
import java.util.List;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.util.PartitionSet;
public class BaseReplacePartitions
extends MergingSnapshotProducer implements ReplacePartitions {
+
+ private final PartitionSet replacedPartitions;
+ private Long startingSnapshotId;
+ private boolean validateConflictingData = false;
+ private boolean validateConflictingDeletes = false;
+
BaseReplacePartitions(String tableName, TableOperations ops) {
super(tableName, ops);
set(SnapshotSummary.REPLACE_PARTITIONS_PROP, "true");
+ replacedPartitions = PartitionSet.create(ops.current().specsById());
}
@Override
@@ -43,6 +51,7 @@ protected String operation() {
@Override
public ReplacePartitions addFile(DataFile file) {
dropPartition(file.specId(), file.partition());
+ replacedPartitions.add(file.specId(), file.partition());
add(file);
return this;
}
@@ -53,6 +62,45 @@ public ReplacePartitions validateAppendOnly() {
return this;
}
+ @Override
+ public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
+ this.startingSnapshotId = newStartingSnapshotId;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingDeletes() {
+ this.validateConflictingDeletes = true;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingData() {
+ this.validateConflictingData = true;
+ return this;
+ }
+
+ @Override
+ public void validate(TableMetadata currentMetadata) {
+ if (validateConflictingData) {
+ if (dataSpec().isUnpartitioned()) {
+ validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
+ } else {
+ validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions);
+ }
+ }
+
+ if (validateConflictingDeletes) {
+ if (dataSpec().isUnpartitioned()) {
+ validateDeletedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
+ validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
+ } else {
+ validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions);
+ validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions);
+ }
+ }
+ }
+
@Override
public List apply(TableMetadata base) {
if (dataSpec().fields().size() <= 0) {
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 8ebdb366a284..51d0cf5d9410 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -54,6 +54,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.Tasks;
@@ -331,6 +332,7 @@ static class Builder {
private Map specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
private Expression partitionFilter = Expressions.alwaysTrue();
+ private PartitionSet partitionSet = null;
private boolean caseSensitive = true;
private ExecutorService executorService = null;
@@ -359,6 +361,11 @@ Builder filterPartitions(Expression newPartitionFilter) {
return this;
}
+ Builder filterPartitions(PartitionSet newPartitionSet) {
+ this.partitionSet = newPartitionSet;
+ return this;
+ }
+
Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
@@ -471,6 +478,7 @@ private Iterable>> deleteManifestRea
ManifestFiles.readDeleteManifest(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
+ .filterPartitions(partitionSet)
.caseSensitive(caseSensitive)
.liveEntries()
);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index c779aca34944..73f4c66b6b05 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionSet;
import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
@@ -80,6 +81,7 @@ private String fileClass() {
private final Schema fileSchema;
// updated by configuration methods
+ private PartitionSet partitionSet = null;
private Expression partFilter = alwaysTrue();
private Expression rowFilter = alwaysTrue();
private Schema fileProjection = null;
@@ -158,6 +160,11 @@ public ManifestReader filterPartitions(Expression expr) {
return this;
}
+ public ManifestReader filterPartitions(PartitionSet partitions) {
+ this.partitionSet = partitions;
+ return this;
+ }
+
public ManifestReader filterRows(Expression expr) {
this.rowFilter = Expressions.and(rowFilter, expr);
return this;
@@ -170,7 +177,8 @@ public ManifestReader caseSensitive(boolean isCaseSensitive) {
CloseableIterable> entries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
- (partFilter != null && partFilter != Expressions.alwaysTrue())) {
+ (partFilter != null && partFilter != Expressions.alwaysTrue()) ||
+ (partitionSet != null)) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
@@ -182,12 +190,17 @@ CloseableIterable> entries() {
open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
- metricsEvaluator.eval(entry.file()));
+ metricsEvaluator.eval(entry.file()) &&
+ inPartitionSet(entry.file()));
} else {
return open(projection(fileSchema, fileProjection, columns, caseSensitive));
}
}
+ private boolean inPartitionSet(F fileToCheck) {
+ return partitionSet == null || partitionSet.contains(fileToCheck.specId(), fileToCheck.partition());
+ }
+
private CloseableIterable> open(Schema projection) {
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index ae3fc9c78dba..82e58710c480 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -32,6 +32,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
@@ -46,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -263,6 +265,29 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath, snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching given partitions have been added to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param partitionSet a set of partitions to filter new conflicting data files
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotId, PartitionSet partitionSet) {
+ CloseableIterable> conflictEntries =
+ addedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator> conflicts = conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting files that can contain records matching partitions %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry -> entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Failed to validate no appends matching %s", partitionSet), e);
+ }
+ }
+
/**
* Validates that no files matching a filter have been added to the table since a starting snapshot.
*
@@ -272,9 +297,37 @@ private ManifestFile copyManifest(ManifestFile manifest) {
*/
protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotId,
Expression conflictDetectionFilter) {
+ CloseableIterable> conflictEntries =
+ addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, null);
+
+ try (CloseableIterator> conflicts = conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting files that can contain records matching %s: %s",
+ conflictDetectionFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry -> entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s", conflictDetectionFilter), e);
+ }
+ }
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param dataFilter an expression used to find new data files
+ * @param partitionSet a set of partitions to find new data files
+ */
+ private CloseableIterable> addedDataFiles(TableMetadata base,
+ Long startingSnapshotId,
+ Expression dataFilter,
+ PartitionSet partitionSet) {
// if there is no current table state, no files have been added
if (base.currentSnapshot() == null) {
- return;
+ return CloseableIterable.empty();
}
Pair, Set> history =
@@ -282,25 +335,23 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
List manifests = history.first();
Set newSnapshots = history.second();
- ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
+ ManifestGroup manifestGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
- .filterData(conflictDetectionFilter)
.specsById(base.specsById())
.ignoreDeleted()
.ignoreExisting();
- try (CloseableIterator> conflicts = conflictGroup.entries().iterator()) {
- if (conflicts.hasNext()) {
- throw new ValidationException("Found conflicting files that can contain records matching %s: %s",
- conflictDetectionFilter,
- Iterators.toString(Iterators.transform(conflicts, entry -> entry.file().path().toString())));
- }
+ if (dataFilter != null) {
+ manifestGroup = manifestGroup.filterData(dataFilter);
+ }
- } catch (IOException e) {
- throw new UncheckedIOException(
- String.format("Failed to validate no appends matching %s", conflictDetectionFilter), e);
+ if (partitionSet != null) {
+ manifestGroup = manifestGroup.filterManifestEntries(entry ->
+ partitionSet.contains(entry.file().specId(), entry.file().partition()));
}
+
+ return manifestGroup.entries();
}
/**
@@ -353,13 +404,9 @@ private void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
return;
}
- Pair, Set> history =
- validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
- List deleteManifests = history.first();
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null);
long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter);
-
for (DataFile dataFile : dataFiles) {
// if any delete is found that applies to files written in or before the starting snapshot, fail
DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
@@ -382,9 +429,42 @@ private void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
* @param dataFilter an expression used to find new conflicting delete files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching %s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param partitionSet a partition set used to find new conflicting delete files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching %s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param dataFilter an expression used to find delete files
+ * @param partitionSet a partition set used to find delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+ .specsById(base.specsById())
+ .build();
}
Pair, Set> history =
@@ -392,11 +472,99 @@ protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapsho
List deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter);
+ return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, partitionSet);
+ }
- ValidationException.check(deletes.isEmpty(),
- "Found new conflicting delete files that can apply to records matching %s: %s",
- dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
+ /**
+ * Validates that no files matching a filter have been deleted from the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param dataFilter an expression used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long startingSnapshotId,
+ Expression dataFilter) {
+ CloseableIterable> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+ try (CloseableIterator> conflicts = conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that can contain records matching %s: %s",
+ dataFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry -> entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no deleted data files matching %s", dataFilter), e);
+ }
+ }
+
+ /**
+ * Validates that no files matching a filter have been deleted from the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param partitionSet a partition set used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long startingSnapshotId,
+ PartitionSet partitionSet) {
+ CloseableIterable> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator> conflicts = conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that can apply to records matching %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry -> entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s", partitionSet), e);
+ }
+ }
+
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the operation
+ * @param dataFilter an expression used to find deleted data files
+ * @param partitionSet a set of partitions to find deleted data files
+ */
+ private CloseableIterable> deletedDataFiles(TableMetadata base,
+ Long startingSnapshotId,
+ Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, no files have been deleted
+ if (base.currentSnapshot() == null) {
+ return CloseableIterable.empty();
+ }
+
+ Pair, Set> history =
+ validationHistory(base, startingSnapshotId, VALIDATE_DATA_FILES_EXIST_OPERATIONS, ManifestContent.DATA);
+ List manifests = history.first();
+ Set newSnapshots = history.second();
+
+ ManifestGroup manifestGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
+ .caseSensitive(caseSensitive)
+ .filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
+ .filterManifestEntries(entry -> entry.status().equals(ManifestEntry.Status.DELETED))
+ .specsById(base.specsById())
+ .ignoreExisting();
+
+ if (dataFilter != null) {
+ manifestGroup = manifestGroup.filterData(dataFilter);
+ }
+
+ if (partitionSet != null) {
+ manifestGroup = manifestGroup.filterManifestEntries(entry ->
+ partitionSet.contains(entry.file().specId(), entry.file().partition()));
+ }
+
+ return manifestGroup.entries();
}
protected void setNewFilesSequenceNumber(long sequenceNumber) {
@@ -413,7 +581,7 @@ private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshot
}
private DeleteFileIndex buildDeleteFileIndex(List deleteManifests, long startingSequenceNumber,
- Expression dataFilter) {
+ Expression dataFilter, PartitionSet partitionSet) {
DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(ops.io(), deleteManifests)
.afterSequenceNumber(startingSequenceNumber)
.caseSensitive(caseSensitive)
@@ -423,6 +591,10 @@ private DeleteFileIndex buildDeleteFileIndex(List deleteManifests,
builder.filterData(dataFilter);
}
+ if (partitionSet != null) {
+ builder.filterPartitions(partitionSet);
+ }
+
return builder.build();
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionSet.java b/core/src/main/java/org/apache/iceberg/util/PartitionSet.java
index 22b2ba011ac2..f65f01c3c831 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionSet.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionSet.java
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.StringJoiner;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -183,6 +184,26 @@ public boolean removeAll(Collection> objects) {
return changed;
}
+ @Override
+ public String toString() {
+ StringJoiner result = new StringJoiner(", ", "[", "]");
+ for (Map.Entry> e : partitionSetById.entrySet()) {
+ StringJoiner partitionDataJoiner = new StringJoiner(", ");
+ Types.StructType structType = partitionTypeById.get(e.getKey());
+ for (StructLike s : e.getValue()) {
+ for (int i = 0; i < structType.fields().size(); i++) {
+ StringBuilder partitionStringBuilder = new StringBuilder();
+ partitionStringBuilder.append(structType.fields().get(i).name());
+ partitionStringBuilder.append("=");
+ partitionStringBuilder.append(s.get(i, Object.class).toString());
+ partitionDataJoiner.add(partitionStringBuilder.toString());
+ }
+ }
+ result.add(partitionDataJoiner.toString());
+ }
+ return result.toString();
+ }
+
@Override
public void clear() {
partitionSetById.clear();
diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
index 30b70dd90182..e5a97e9a9a9c 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -52,6 +53,19 @@ public class TestReplacePartitions extends TableTestBase {
.withRecordCount(0)
.build();
+ static final DataFile FILE_UNPARTITIONED_A = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-unpartitioned-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ static final DeleteFile FILE_UNPARTITIONED_A_DELETES = FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .ofPositionDeletes()
+ .withPath("/path/to/data-unpartitioned-a-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
@@ -253,6 +267,484 @@ public void testValidationSuccess() {
statuses(Status.ADDED, Status.ADDED));
}
+ @Test
+ public void testValidationNotInvoked() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+
+ // Two concurrent ReplacePartitions with No Validation Enabled
+ table.newReplacePartitions()
+ .addFile(FILE_E)
+ .validateFromSnapshot(base.currentSnapshot().snapshotId())
+ .commit();
+ table.newReplacePartitions()
+ .addFile(FILE_A) // Replaces FILE_E which becomes Deleted
+ .addFile(FILE_B)
+ .validateFromSnapshot(base.currentSnapshot().snapshotId())
+ .commit();
+
+ long replaceId = readMetadata().currentSnapshot().snapshotId();
+ Assert.assertEquals("Table should have 2 manifest",
+ 2, table.currentSnapshot().allManifests().size());
+ validateManifestEntries(table.currentSnapshot().allManifests().get(0),
+ ids(replaceId, replaceId),
+ files(FILE_A, FILE_B),
+ statuses(Status.ADDED, Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(1),
+ ids(replaceId),
+ files(FILE_E),
+ statuses(Status.DELETED));
+ }
+
+ @Test
+ public void testValidateWithDefaultSnapshotId() {
+ table.newReplacePartitions()
+ .addFile(FILE_A)
+ .commit();
+
+ // Concurrent Replace Partitions should fail with ValidationException
+ ReplacePartitions replace = table.newReplacePartitions();
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching partitions " +
+ "[data_bucket=0, data_bucket=1]: [/path/to/data-a.parquet]",
+ () ->
+ replace
+ .addFile(FILE_A)
+ .addFile(FILE_B)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .commit());
+ }
+
+ @Test
+ public void testConcurrentReplaceConflict() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Replace Partitions should fail with ValidationException
+ table.newReplacePartitions()
+ .addFile(FILE_A)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching partitions " +
+ "[data_bucket=0, data_bucket=1]: [/path/to/data-a.parquet]",
+ () ->
+ table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .addFile(FILE_A)
+ .addFile(FILE_B)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .commit());
+ }
+
+ @Test
+ public void testConcurrentReplaceNoConflict() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long id1 = base.currentSnapshot().snapshotId();
+
+ // Concurrent Replace Partitions should not fail if concerning different partitions
+ table.newReplacePartitions()
+ .addFile(FILE_A)
+ .commit();
+ long id2 = readMetadata().currentSnapshot().snapshotId();
+
+ table.newReplacePartitions()
+ .validateFromSnapshot(id1)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_B)
+ .commit();
+
+ long id3 = readMetadata().currentSnapshot().snapshotId();
+ Assert.assertEquals("Table should have 2 manifests",
+ 2, table.currentSnapshot().allManifests().size());
+ validateManifestEntries(table.currentSnapshot().allManifests().get(0),
+ ids(id3),
+ files(FILE_B),
+ statuses(Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(1),
+ ids(id2),
+ files(FILE_A),
+ statuses(Status.ADDED));
+ }
+
+ @Test
+ public void testConcurrentReplaceConflictNonPartitioned() {
+ Table unpartitioned = TestTables.create(
+ tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+ unpartitioned.newAppend()
+ .appendFile(FILE_UNPARTITIONED_A)
+ .commit();
+
+ TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned");
+ long replaceBaseId = replaceMetadata.currentSnapshot().snapshotId();
+
+ // Concurrent ReplacePartitions should fail with ValidationException
+ unpartitioned.newReplacePartitions()
+ .addFile(FILE_UNPARTITIONED_A)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching true: " +
+ "[/path/to/data-unpartitioned-a.parquet]",
+ () ->
+ unpartitioned.newReplacePartitions()
+ .validateFromSnapshot(replaceBaseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_UNPARTITIONED_A)
+ .commit());
+ }
+
+ @Test
+ public void testAppendReplaceConflict() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Append and ReplacePartition should fail with ValidationException
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching partitions " +
+ "[data_bucket=0, data_bucket=1]: [/path/to/data-b.parquet]",
+ () ->
+ table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_A)
+ .addFile(FILE_B)
+ .commit());
+ }
+
+ @Test
+ public void testAppendReplaceNoConflict() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long id1 = base.currentSnapshot().snapshotId();
+
+ // Concurrent Append and ReplacePartition should not conflict if concerning different partitions
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ long id2 = readMetadata().currentSnapshot().snapshotId();
+
+ table.newReplacePartitions()
+ .validateFromSnapshot(id1)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_A)
+ .commit();
+
+ long id3 = readMetadata().currentSnapshot().snapshotId();
+ Assert.assertEquals("Table should have 3 manifests",
+ 3, table.currentSnapshot().allManifests().size());
+ validateManifestEntries(table.currentSnapshot().allManifests().get(0),
+ ids(id3),
+ files(FILE_A),
+ statuses(Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(1),
+ ids(id2),
+ files(FILE_B),
+ statuses(Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(2),
+ ids(id3),
+ files(FILE_A),
+ statuses(Status.DELETED));
+ }
+
+ @Test
+ public void testAppendReplaceConflictNonPartitioned() {
+ Table unpartitioned = TestTables.create(
+ tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+ unpartitioned.newAppend()
+ .appendFile(FILE_UNPARTITIONED_A)
+ .commit();
+
+ TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned");
+ long replaceBaseId = replaceMetadata.currentSnapshot().snapshotId();
+
+ // Concurrent Append and ReplacePartitions should fail with ValidationException
+ unpartitioned.newAppend()
+ .appendFile(FILE_UNPARTITIONED_A)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching true: " +
+ "[/path/to/data-unpartitioned-a.parquet]",
+ () ->
+ unpartitioned.newReplacePartitions()
+ .validateFromSnapshot(replaceBaseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_UNPARTITIONED_A)
+ .commit());
+ }
+
+ @Test
+ public void testDeleteReplaceConflict() {
+ Assume.assumeTrue(formatVersion == 2);
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Delete and ReplacePartition should fail with ValidationException
+ table.newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .validateFromSnapshot(baseId)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found new conflicting delete files that can apply to records matching " +
+ "[data_bucket=0]: [/path/to/data-a-deletes.parquet]",
+ () ->
+ table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_A)
+ .commit());
+ }
+
+ @Test
+ public void testDeleteReplaceConflictNonPartitioned() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Table unpartitioned = TestTables.create(
+ tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+ unpartitioned.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned");
+ long replaceBaseId = replaceMetadata.currentSnapshot().snapshotId();
+
+ // Concurrent Delete and ReplacePartitions should fail with ValidationException
+ unpartitioned.newRowDelta()
+ .addDeletes(FILE_UNPARTITIONED_A_DELETES)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found new conflicting delete files that can apply to records matching true: " +
+ "[/path/to/data-unpartitioned-a-deletes.parquet]",
+ () ->
+ unpartitioned.newReplacePartitions()
+ .validateFromSnapshot(replaceBaseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_UNPARTITIONED_A)
+ .commit());
+ }
+
+ @Test
+ public void testDeleteReplaceNoConflict() {
+ Assume.assumeTrue(formatVersion == 2);
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long id1 = readMetadata().currentSnapshot().snapshotId();
+
+ // Concurrent Delta and ReplacePartition should not conflict if concerning different partitions
+ table.newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .validateFromSnapshot(id1)
+ .validateNoConflictingDataFiles()
+ .validateNoConflictingDeleteFiles()
+ .validateFromSnapshot(id1)
+ .commit();
+ long id2 = readMetadata().currentSnapshot().snapshotId();
+
+ table.newReplacePartitions()
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .validateFromSnapshot(id1)
+ .addFile(FILE_B)
+ .commit();
+ long id3 = readMetadata().currentSnapshot().snapshotId();
+
+ Assert.assertEquals("Table should have 3 manifest",
+ 3, table.currentSnapshot().allManifests().size());
+ validateManifestEntries(table.currentSnapshot().allManifests().get(0),
+ ids(id3),
+ files(FILE_B),
+ statuses(Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(1),
+ ids(id1),
+ files(FILE_A),
+ statuses(Status.ADDED));
+ validateDeleteManifest(table.currentSnapshot().allManifests().get(2),
+ seqs(2),
+ ids(id2),
+ files(FILE_A_DELETES),
+ statuses(Status.ADDED));
+ }
+
+ @Test
+ public void testOverwriteReplaceConflict() {
+ Assume.assumeTrue(formatVersion == 2);
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Overwrite and ReplacePartition should fail with ValidationException
+ table.newOverwrite()
+ .deleteFile(FILE_A)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting deleted files that can apply to records matching " +
+ "[data_bucket=0]: [/path/to/data-a.parquet]",
+ () ->
+ table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_A)
+ .commit());
+ }
+
+ @Test
+ public void testOverwriteReplaceNoConflict() {
+ Assume.assumeTrue(formatVersion == 2);
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Overwrite and ReplacePartition should not fail with if concerning different partitions
+ table.newOverwrite()
+ .deleteFile(FILE_A)
+ .commit();
+
+ table.newReplacePartitions()
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .validateFromSnapshot(baseId)
+ .addFile(FILE_B)
+ .commit();
+
+ long finalId = readMetadata().currentSnapshot().snapshotId();
+
+ Assert.assertEquals("Table should have 2 manifest",
+ 2, table.currentSnapshot().allManifests().size());
+ validateManifestEntries(table.currentSnapshot().allManifests().get(0),
+ ids(finalId),
+ files(FILE_B),
+ statuses(Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(1),
+ ids(finalId),
+ files(FILE_B),
+ statuses(Status.DELETED));
+ }
+
+ @Test
+ public void testOverwriteReplaceConflictNonPartitioned() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ Table unpartitioned = TestTables.create(
+ tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+
+ unpartitioned.newAppend()
+ .appendFile(FILE_UNPARTITIONED_A)
+ .commit();
+
+ TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned");
+ long replaceBaseId = replaceMetadata.currentSnapshot().snapshotId();
+
+ // Concurrent Overwrite and ReplacePartitions should fail with ValidationException
+ unpartitioned.newOverwrite()
+ .deleteFile(FILE_UNPARTITIONED_A)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit with file matching partitions replaced",
+ ValidationException.class,
+ "Found conflicting deleted files that can contain records matching true: " +
+ "[/path/to/data-unpartitioned-a.parquet]",
+ () ->
+ unpartitioned.newReplacePartitions()
+ .validateFromSnapshot(replaceBaseId)
+ .validateNoConflictingData()
+ .validateNoConflictingDeletes()
+ .addFile(FILE_UNPARTITIONED_A)
+ .commit());
+ }
+
+ @Test
+ public void testValidateOnlyDeletes() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long baseId = readMetadata().currentSnapshot().snapshotId();
+
+ // Snapshot Isolation mode: appends do not conflict with replace
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .validateNoConflictingDeletes()
+ .addFile(FILE_B)
+ .commit();
+ long finalId = readMetadata().currentSnapshot().snapshotId();
+
+ Assert.assertEquals("Table should have 3 manifest",
+ 3, table.currentSnapshot().allManifests().size());
+ validateManifestEntries(table.currentSnapshot().allManifests().get(0),
+ ids(finalId),
+ files(FILE_B),
+ statuses(Status.ADDED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(1),
+ ids(finalId),
+ files(FILE_B),
+ statuses(Status.DELETED));
+ validateManifestEntries(table.currentSnapshot().allManifests().get(2),
+ ids(baseId),
+ files(FILE_A),
+ statuses(Status.ADDED));
+ }
+
@Test
public void testEmptyPartitionPathWithUnpartitionedTable() {
DataFiles.builder(PartitionSpec.unpartitioned())
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
new file mode 100644
index 000000000000..6be70bf6a3ae
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConflictValidation extends SparkExtensionsTestBase {
+
+ public TestConflictValidation(String catalogName, String implementation, Map config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void createTables() {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg " +
+ "PARTITIONED BY (id)" +
+ "TBLPROPERTIES" +
+ "('format-version'='2'," +
+ "'write.delete.mode'='merge-on-read')", tableName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testOverwritePartitionSerializableIsolation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting deleted data files should throw exception",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching partitions [id=1]",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ }
+
+ @Test
+ public void testOverwritePartitionSnapshotIsolation() throws Exception {
+ List records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(1, "b"));
+ spark.createDataFrame(records, SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ // This should generate a delete file
+ sql("DELETE FROM %s WHERE data='a'", tableName);
+
+ // Validating from previous snapshot finds conflicts
+ Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting deleted data files should throw exception",
+ ValidationException.class,
+ "Found new conflicting delete files that can apply to records matching [id=1]",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
+ .overwritePartitions();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
+ .overwritePartitions();
+ }
+
+ @Test
+ public void testOverwritePartitionSnapshotIsolation2() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ // This should delete a data file
+ sql("DELETE FROM %s WHERE id='1'", tableName);
+
+ // Validating from previous snapshot finds conflicts
+ List records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records, SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+ Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
+
+ AssertHelpers.assertThrowsCause("Conflicting deleted data files should throw exception",
+ ValidationException.class,
+ "Found conflicting deleted files that can apply to records matching [id=1]",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
+ .overwritePartitions();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
+ .overwritePartitions();
+ }
+
+ @Test
+ public void testOverwritePartitionNoSnapshotIdValidation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ List records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting deleted data files should throw exception",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching partitions [id=1]",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index f9d268e1778d..047a0b8169ed 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -252,4 +253,17 @@ public boolean useTableDistributionAndOrdering() {
.defaultValue(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT)
.parse();
}
+
+ public Long validateFromSnapshotId() {
+ return confParser.longConf()
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID)
+ .parseOptional();
+ }
+
+ public IsolationLevel isolationLevel() {
+ String isolationLevelName = confParser.stringConf()
+ .option(SparkWriteOptions.ISOLATION_LEVEL)
+ .parseOptional();
+ return isolationLevelName != null ? IsolationLevel.fromName(isolationLevelName) : null;
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 3992b3ca0539..72de545f1298 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -72,4 +72,10 @@ private SparkWriteOptions() {
public static final String MERGE_SCHEMA = "merge-schema";
public static final String SPARK_MERGE_SCHEMA = "mergeSchema";
public static final boolean MERGE_SCHEMA_DEFAULT = false;
+
+ // Identifies snapshot from which to start validating conflicting changes
+ public static final String VALIDATE_FROM_SNAPSHOT_ID = "validate-from-snapshot-id";
+
+ // Isolation Level for DataFrame calls. Currently supported by overwritePartitions
+ public static final String ISOLATION_LEVEL = "isolation-level";
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index c5cc2b24658e..4400a61dfaaa 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -98,6 +98,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private static final Logger LOG = LoggerFactory.getLogger(SparkWrite.class);
private final JavaSparkContext sparkContext;
+ private final SparkWriteConf writeConf;
private final Table table;
private final String queryId;
private final FileFormat format;
@@ -118,6 +119,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
Distribution requiredDistribution, SortOrder[] requiredOrdering) {
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
+ this.writeConf = writeConf;
this.queryId = writeInfo.queryId();
this.format = writeConf.dataFileFormat();
this.applicationId = applicationId;
@@ -272,6 +274,22 @@ public void commit(WriterCommitMessage[] messages) {
}
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+ IsolationLevel isolationLevel = writeConf.isolationLevel();
+ Long validateFromSnapshotId = writeConf.validateFromSnapshotId();
+
+ if (isolationLevel == SERIALIZABLE) {
+ if (validateFromSnapshotId != null) {
+ dynamicOverwrite.validateFromSnapshot(validateFromSnapshotId);
+ }
+ dynamicOverwrite.validateNoConflictingData();
+ dynamicOverwrite.validateNoConflictingDeletes();
+
+ } else if (isolationLevel == SNAPSHOT) {
+ if (validateFromSnapshotId != null) {
+ dynamicOverwrite.validateFromSnapshot(validateFromSnapshotId);
+ }
+ dynamicOverwrite.validateNoConflictingDeletes();
+ }
int numFiles = 0;
for (DataFile file : files) {