Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,16 @@ RewriteFiles rewriteFiles(Set<DataFile> dataFilesToReplace, Set<DeleteFile> dele
* @return this for method chaining
*/
RewriteFiles validateFromSnapshot(long snapshotId);

/**
* Add data file paths that must not be rewritten by conflicting commits for this operation to succeed.
* <p>
* If any path has been rewritten in a replace by a conflicting commit in the table since the snapshot passed to
* {@link #validateFromSnapshot(long)}, the operation will fail with a
* {@link org.apache.iceberg.exceptions.ValidationException}.
*
* @param referencedFiles file paths that are referenced by a position delete file
* @return this for method chaining
*/
RewriteFiles validateDataFilesNotRewritten(Iterable<? extends CharSequence> referencedFiles);
}
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private Long startingSnapshotId = null;
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
private boolean validateDeletes = false;

BaseRewriteFiles(String tableName, TableOperations ops) {
super(tableName, ops);
Expand Down Expand Up @@ -97,11 +100,21 @@ public RewriteFiles validateFromSnapshot(long snapshotId) {
return this;
}

@Override
public RewriteFiles validateDataFilesNotRewritten(Iterable<? extends CharSequence> referencedFiles) {
referencedFiles.forEach(referencedDataFiles::add);
return this;
}

@Override
protected void validate(TableMetadata base) {
if (replacedDataFiles.size() > 0) {
if (!replacedDataFiles.isEmpty()) {
// if there are replaced data files, there cannot be any new row-level deletes for those data files
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles);
}

if (!referencedDataFiles.isEmpty()) {
validateDataFilesNotRewritten(base, startingSnapshotId, referencedDataFiles);
}
}
}
20 changes: 15 additions & 5 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// delete files can be added in "overwrite" or "delete" operations
private static final Set<String> VALIDATE_REPLACED_DATA_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE);
private static final Set<String> VALIDATE_DATA_FILES_NOT_REWRITTEN_OPERATIONS =
ImmutableSet.of(DataOperations.REPLACE);

private final String tableName;
private final TableOperations ops;
Expand Down Expand Up @@ -316,18 +318,26 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
}
}

@SuppressWarnings("CollectionUndefinedEquality")
protected void validateDataFilesNotRewritten(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles) {
validateDataFilesExist(base, startingSnapshotId, requiredDataFiles, VALIDATE_DATA_FILES_NOT_REWRITTEN_OPERATIONS);
}

protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles, boolean skipDeletes) {
validateDataFilesExist(base, startingSnapshotId, requiredDataFiles, skipDeletes ?
VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS :
VALIDATE_DATA_FILES_EXIST_OPERATIONS);
}

@SuppressWarnings("CollectionUndefinedEquality")
private void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles, Set<String> matchingOperations) {
// if there is no current table state, no files have been removed
if (base.currentSnapshot() == null) {
return;
}

Set<String> matchingOperations = skipDeletes ?
VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS :
VALIDATE_DATA_FILES_EXIST_OPERATIONS;

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA);
List<ManifestFile> manifests = history.first();
Expand Down
111 changes: 111 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,4 +641,115 @@ public void testNewDeleteFile() {
.rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2))
.apply();
}

@Test
public void testRewriteReferencedDataFile() {
Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1);

table.newAppend()
.appendFile(FILE_A)
.commit();

table.newRowDelta()
.addDeletes(FILE_A_DELETES)
.commit();

long snapshotBeforeDeleteRewrite = table.currentSnapshot().snapshotId();

// simulate rewrite deletes in FILE_A_DELETES to FILE_B_DELETES
table.newRewrite()
.validateFromSnapshot(snapshotBeforeDeleteRewrite)
.validateDataFilesNotRewritten(Sets.newSet(FILE_A.path()))
.rewriteFiles(Sets.newSet(), Sets.newSet(FILE_A_DELETES), Sets.newSet(), Sets.newSet(FILE_B_DELETES))
.commit();

long snapshotBeforeRewriteFileA = table.currentSnapshot().snapshotId();

// rewrite FILE_A as FILE_A2
table.newRewrite()
.validateFromSnapshot(table.currentSnapshot().snapshotId())
.rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2))
.commit();

AssertHelpers.assertThrows("Should fail because a referenced file was rewritten",
ValidationException.class, "Cannot commit, missing data files",
() -> table.newRewrite()
.validateFromSnapshot(snapshotBeforeRewriteFileA)
.validateDataFilesNotRewritten(Sets.newSet(FILE_A.path()))
.rewriteFiles(Sets.newSet(), Sets.newSet(FILE_B_DELETES), Sets.newSet(), Sets.newSet(FILE_A_DELETES))
.apply());
}

@Test
public void testOverwriteReferencedDataFile() {
Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1);

table.newAppend()
.appendFile(FILE_A)
.commit();

table.newRowDelta()
.addDeletes(FILE_A_DELETES)
.commit();

long snapshotBeforeDeleteRewrite = table.currentSnapshot().snapshotId();

// simulate rewrite deletes in FILE_A_DELETES to FILE_B_DELETES
table.newRewrite()
.validateFromSnapshot(snapshotBeforeDeleteRewrite)
.validateDataFilesNotRewritten(Sets.newSet(FILE_A.path()))
.rewriteFiles(Sets.newSet(), Sets.newSet(FILE_A_DELETES), Sets.newSet(), Sets.newSet(FILE_B_DELETES))
.commit();

long snapshotBeforeOverwriteFileA = table.currentSnapshot().snapshotId();

// overwrite FILE_A with FILE_A2
table.newOverwrite()
.deleteFile(FILE_A)
.addFile(FILE_A2)
.commit();

// the rewrite succeeds because the overwrite is required to read FILE_A correctly
table.newRewrite()
.validateFromSnapshot(snapshotBeforeOverwriteFileA)
.validateDataFilesNotRewritten(Sets.newSet(FILE_A.path()))
.rewriteFiles(Sets.newSet(), Sets.newSet(FILE_B_DELETES), Sets.newSet(), Sets.newSet(FILE_A_DELETES))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconsidered the comment, I agree that conflicts between REPLACE and OVERWRITE/INSERT (say conflict-1) is another different story compared to conflicts between REPLACE and REPLACE (say conflict-2).

For this case The REPLACE operation remove the data files that was relied by other committed APPEND/OVERWRITE/DELTE operations, both conflict-1 and conflict-2 should be avoided because both of them will lead to incorrect data set.

For the other case The APPEND/OVERWRITE/DELETE operations removed the data files that was relied by other committing REPLACE operation, conflict-1 won't lead to incorrect data set although there will be some remaining dangling positional deletes (as you said in this comment). but it's possible to lead to incorrect data set when considering conflict-2:

Step.1 : Table has FILE_A and EQ_DELETE_FILE_A;
Step.2 : RewriteAction_1 rewrite the FILE_A to another FILE_B - not commit;
Step.3 : RewriteAction_2 rewrite the EQ_DELETE_FILE_A to POS_DELETE_FILE_C which reference to FILE_A - not commit.
Step.4. : Committed RewriteAction_1 ;
Step.5 : Committed RewriteAction_2.

In the end, the POS_DELETE_FILE_C won't be able to apply to the newly rewritten FILE_B, which create the incorrect data set. Using older sequence number for RewriteAction also cannot fix this bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case I was thinking about in my last comment on the other PR. In order for RewriteAction_1 to be valid, it must reuse the sequence from FILE_A for FILE_B. Otherwise, the rewrite on its own would have un-deleted a row because EQ_DELETE_FILE_A would no longer apply.

The validation in this PR can catch this case because the files referenced by POS_DELETE_FILE_C would be passed to the validation. That's FILE_A in this case and the commit for RewriteAction_2 would check that FILE_A still exists and would fail. I'm fine merging this PR if you think that this is something that may happen.

But, I think that it is really unlikely that rewrites will alter sequence numbers to avoid applying deletes. That makes little sense because you may as well apply deletes as long as you're rewriting the data. But assuming that you wanted to, this may not even be possible if the files that are rewritten are from different sequence numbers, with different sets of equality delete files that must be applied. I think a far better option is to apply the deletes when rewriting.

I'm fine merging this if you think we need it. I'll remove the draft status so that we can.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the assumptions we made while implementing the new data compaction is that we will always apply deletes while compacting data and will use a new sequence number for compacted files to make sure no deletes apply to them. Is there a good use case not to do that?

But assuming that you wanted to, this may not even be possible if the files that are rewritten are from different sequence numbers, with different sets of equality delete files that must be applied.

Well, if we wanted to compact data files without applying deletes, the implementation would have to be really complicated to address the point above.

If there is a use case (even though there is no implementation yet), I think we should add this validation. If not, I'd probably skip it. I don't have a use case in mind right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @RussellSpitzer, any use case you have?

.apply();
}

@Test
public void testDeleteReferencedDataFile() {
Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1);

table.newAppend()
.appendFile(FILE_A)
.commit();

table.newRowDelta()
.addDeletes(FILE_A_DELETES)
.commit();

long snapshotBeforeDeleteRewrite = table.currentSnapshot().snapshotId();

// simulate rewrite deletes in FILE_A_DELETES to FILE_B_DELETES
table.newRewrite()
.validateFromSnapshot(snapshotBeforeDeleteRewrite)
.validateDataFilesNotRewritten(Sets.newSet(FILE_A.path()))
.rewriteFiles(Sets.newSet(), Sets.newSet(FILE_A_DELETES), Sets.newSet(), Sets.newSet(FILE_B_DELETES))
.commit();

long snapshotBeforeDeleteFileA = table.currentSnapshot().snapshotId();

// delete FILE_A
table.newDelete()
.deleteFile(FILE_A)
.commit();

// rewrite deletes, but ignore that FILE_A was removed
table.newRewrite()
.validateFromSnapshot(snapshotBeforeDeleteFileA)
.validateDataFilesNotRewritten(Sets.newSet(FILE_A.path()))
.rewriteFiles(Sets.newSet(), Sets.newSet(FILE_B_DELETES), Sets.newSet(), Sets.newSet(FILE_A_DELETES))
.apply();
}
}