Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilt
protected void validate(TableMetadata base) {
if (base.currentSnapshot() != null) {
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
validateDataFilesExist(
base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does make the assumption that the conflict detection filter and referenced data files are related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a correct assumption as the conflict detection filter is our scan condition and referenced data files are data files were read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a valid assumption.

}

// TODO: does this need to check new delete files?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin

@SuppressWarnings("CollectionUndefinedEquality")
protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles, boolean skipDeletes) {
CharSequenceSet requiredDataFiles, boolean skipDeletes,
Expression conflictDetectionFilter) {
// if there is no current table state, no files have been removed
if (base.currentSnapshot() == null) {
return;
Expand All @@ -339,6 +340,10 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
.specsById(base.specsById())
.ignoreExisting();

if (conflictDetectionFilter != null) {
matchingDeletesGroup.filterData(conflictDetectionFilter);
}

try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
if (deletes.hasNext()) {
throw new ValidationException("Cannot commit, missing data files: %s",
Expand Down
121 changes: 121 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -629,4 +630,124 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() {
files(FILE_A_DELETES),
statuses(Status.ADDED));
}

@Test
public void testValidateDataFilesExistWithConflictDetectionFilter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a test where the validation fails? It looks like this one just checks that you can do isolated operations but I think we should do a conflicting test as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a negative test too.

// change the spec to be partitioned by data
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField(Expressions.ref("data"))
.commit();

// add a data file to partition A
DataFile dataFile1 = DataFiles.builder(table.spec())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

table.newAppend()
.appendFile(dataFile1)
.commit();

// add a data file to partition B
DataFile dataFile2 = DataFiles.builder(table.spec())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=b")
.withRecordCount(1)
.build();

table.newAppend()
.appendFile(dataFile2)
.commit();

// use this snapshot as the starting snapshot in rowDelta
Snapshot baseSnapshot = table.currentSnapshot();

// add a delete file for partition A
DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/data-a-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

Expression conflictDetectionFilter = Expressions.equal("data", "a");
RowDelta rowDelta = table.newRowDelta()
.addDeletes(deleteFile)
.validateDataFilesExist(ImmutableList.of(dataFile1.path()))
.validateDeletedFiles()
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingAppends(conflictDetectionFilter);

// concurrently delete the file for partition B
table.newDelete()
.deleteFile(dataFile2)
.commit();

// commit the delta for partition A
rowDelta.commit();

Assert.assertEquals("Table should have one new delete manifest",
1, table.currentSnapshot().deleteManifests().size());
ManifestFile deletes = table.currentSnapshot().deleteManifests().get(0);
validateDeleteManifest(deletes,
seqs(4),
ids(table.currentSnapshot().snapshotId()),
files(deleteFile),
statuses(Status.ADDED));
}

@Test
public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() {
// change the spec to be partitioned by data
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField(Expressions.ref("data"))
.commit();

// add a data file to partition A
DataFile dataFile1 = DataFiles.builder(table.spec())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

table.newAppend()
.appendFile(dataFile1)
.commit();

// use this snapshot as the starting snapshot in rowDelta
Snapshot baseSnapshot = table.currentSnapshot();

// add a delete file for partition A
DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/data-a-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

Expression conflictDetectionFilter = Expressions.equal("data", "a");
RowDelta rowDelta = table.newRowDelta()
.addDeletes(deleteFile)
.validateDataFilesExist(ImmutableList.of(dataFile1.path()))
.validateDeletedFiles()
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingAppends(conflictDetectionFilter);

// concurrently delete the file for partition A
table.newDelete()
.deleteFile(dataFile1)
.commit();

AssertHelpers.assertThrows("Should fail to add deletes because data file is missing",
ValidationException.class, "Cannot commit, missing data files",
rowDelta::commit);
}
}