diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ff9174a84399..010df8cf5da2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -50,7 +50,6 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; @@ -282,9 +281,14 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the // merged one will lead to the incorrect delete semantic. WriteResult result = e.getValue(); - RowDelta rowDelta = table.newRowDelta() - .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())) - .validateDeletedFiles(); + + // Row delta validations are not needed for streaming changes that write equality deletes. Equality deletes + // are applied to data in all previous sequence numbers, so retries may push deletes further in the future, + // but do not affect correctness. Position deletes committed to the table in this path are used only to delete + // rows from data files that are being added in this commit. There is no way for data files added along with + // the delete files to be concurrently removed, so there is no need to validate the files referenced by the + // position delete files that are being committed. + RowDelta rowDelta = table.newRowDelta(); int numDataFiles = result.dataFiles().length; Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 34785cfb6a34..8a94beea12d4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -679,66 +679,6 @@ public void testDeleteFiles() throws Exception { } } - @Test - public void testValidateDataFileExist() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); - long timestamp = 0; - long checkpoint = 10; - JobID jobId = new JobID(); - FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); - - RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); - DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1)); - - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { - harness.setup(); - harness.open(); - - // Txn#1: insert the row <1, 'aaa'> - harness.processElement(WriteResult.builder() - .addDataFiles(dataFile1) - .build(), - ++timestamp); - harness.snapshot(checkpoint, ++timestamp); - harness.notifyOfCompletedCheckpoint(checkpoint); - - // Txn#2: Overwrite the committed data-file-1 - RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); - DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2)); - new TestTableLoader(tablePath) - .loadTable() - .newOverwrite() - .addFile(dataFile2) - .deleteFile(dataFile1) - .commit(); - } - - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { - harness.setup(); - harness.open(); - - // Txn#3: position-delete the <1, 'aaa'> (NOT committed). - DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory, - "pos-delete-file-1", - ImmutableList.of(Pair.of(dataFile1.path(), 0L))); - harness.processElement(WriteResult.builder() - .addDeleteFiles(deleteFile1) - .addReferencedDataFiles(dataFile1.path()) - .build(), - ++timestamp); - harness.snapshot(++checkpoint, ++timestamp); - - // Txn#3: validate will be failure when committing. - final long currentCheckpointId = checkpoint; - AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.", - ValidationException.class, "Cannot commit, missing data files", - () -> { - harness.notifyOfCompletedCheckpoint(currentCheckpointId); - return null; - }); - } - } - @Test public void testCommitTwoCheckpointsInSingleTxn() throws Exception { Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2);