Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
Expand Down Expand Up @@ -282,9 +281,14 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
// txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the
// merged one will lead to the incorrect delete semantic.
Copy link
Contributor

@kbendick kbendick Oct 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the new comment that retries may push deletes further in the future, should we possibly reword this comment here?

It states that for the sequential transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1.

If equality-deletes could get pushed out further due to retries and other scenarios, would it be more appropriate for the above comment to read something like equality-delete files of transaction N are required to be applied to data files from transactions strictly prior to N? There's probably a better way to phrase it, but the above comment seems to still imply that equality delete files need to be applied strictly in the next transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay either way. This comment is saying that the inserts from one checkpoint are affected by delete files in the next checkpoint, so we have to commit them separately rather than as a single transaction. The txn numbers were already generic, since the map may have more than 2 entries.

WriteResult result = e.getValue();
RowDelta rowDelta = table.newRowDelta()
.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
.validateDeletedFiles();

// Row delta validations are not needed for streaming changes that write equality deletes. Equality deletes
// are applied to data in all previous sequence numbers, so retries may push deletes further in the future,
// but do not affect correctness. Position deletes committed to the table in this path are used only to delete
// rows from data files that are being added in this commit. There is no way for data files added along with
// the delete files to be concurrently removed, so there is no need to validate the files referenced by the
// position delete files that are being committed.
RowDelta rowDelta = table.newRowDelta();

int numDataFiles = result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,66 +679,6 @@ public void testDeleteFiles() throws Exception {
}
}

@Test
public void testValidateDataFileExist() throws Exception {
Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);
long timestamp = 0;
long checkpoint = 10;
JobID jobId = new JobID();
FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory();

RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1));

try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
harness.setup();
harness.open();

// Txn#1: insert the row <1, 'aaa'>
harness.processElement(WriteResult.builder()
.addDataFiles(dataFile1)
.build(),
++timestamp);
harness.snapshot(checkpoint, ++timestamp);
harness.notifyOfCompletedCheckpoint(checkpoint);

// Txn#2: Overwrite the committed data-file-1
RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2));
new TestTableLoader(tablePath)
.loadTable()
.newOverwrite()
.addFile(dataFile2)
.deleteFile(dataFile1)
.commit();
}

try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
harness.setup();
harness.open();

// Txn#3: position-delete the <1, 'aaa'> (NOT committed).
DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory,
"pos-delete-file-1",
ImmutableList.of(Pair.of(dataFile1.path(), 0L)));
harness.processElement(WriteResult.builder()
.addDeleteFiles(deleteFile1)
.addReferencedDataFiles(dataFile1.path())
.build(),
++timestamp);
harness.snapshot(++checkpoint, ++timestamp);

// Txn#3: validate will be failure when committing.
final long currentCheckpointId = checkpoint;
AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.",
ValidationException.class, "Cannot commit, missing data files",
() -> {
harness.notifyOfCompletedCheckpoint(currentCheckpointId);
return null;
});
}
}

@Test
public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);
Expand Down