diff --git a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java index 7e8ab65304c5..5af9cdbcf1ba 100644 --- a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java +++ b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import java.util.function.Consumer; + /** * API for overwriting files in a table by partition. * @@ -71,6 +73,20 @@ public interface ReplacePartitions extends SnapshotUpdate { */ ReplacePartitions validateFromSnapshot(long snapshotId); + /** + * Enables snapshot validation with a user-provided function, which must throw an exception on + * validation failures. + * + *

Clients can use this method to validate summary and other metadata of parent snapshots. + * + * @param snapshotValidator a user function to validate parent snapshots + * @return this for method chaining + */ + default ReplacePartitions validateSnapshot(Consumer snapshotValidator) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement validateSnapshot"); + } + /** * Enables validation that deletes that happened concurrently do not conflict with this commit's * operation. diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index f5f10b70f9ce..0f1d5d34063d 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import java.util.function.Consumer; import org.apache.iceberg.expressions.Expression; /** @@ -79,6 +80,20 @@ default RowDelta removeDeletes(DeleteFile deletes) { */ RowDelta validateFromSnapshot(long snapshotId); + /** + * Enables snapshot validation with a user-provided function, which must throw an exception on + * validation failures. + * + *

Clients can use this method to validate summary and other metadata of parent snapshots. + * + * @param snapshotValidator a user function to validate parent snapshots + * @return this for method chaining + */ + default RowDelta validateSnapshot(Consumer snapshotValidator) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement validateSnapshot"); + } + /** * Enables or disables case sensitive expression binding for validations that accept expressions. * diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 892257b51b0c..fcc3c1ce5352 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -19,6 +19,8 @@ package org.apache.iceberg; import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.util.PartitionSet; @@ -28,6 +30,7 @@ public class BaseReplacePartitions extends MergingSnapshotProducer snapshotValidator = null; private boolean validateConflictingData = false; private boolean validateConflictingDeletes = false; @@ -67,6 +70,12 @@ public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) { return this; } + @Override + public ReplacePartitions validateSnapshot(Consumer validator) { + this.snapshotValidator = validator; + return this; + } + @Override public ReplacePartitions validateNoConflictingDeletes() { this.validateConflictingDeletes = true; @@ -87,6 +96,10 @@ public BaseReplacePartitions toBranch(String branch) { @Override public void validate(TableMetadata currentMetadata, Snapshot parent) { + if (snapshotValidator != null) { + validateSnapshots(snapshotValidator, currentMetadata, startingSnapshotId, parent); + } + if (validateConflictingData) { if (dataSpec().isUnpartitioned()) { validateAddedDataFiles( diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index b819d03dd5f8..ba7103385fe5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -19,7 +19,9 @@ package org.apache.iceberg; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -30,6 +32,7 @@ public class BaseRowDelta extends MergingSnapshotProducer implements RowDelta { private Long startingSnapshotId = null; // check all versions by default + @Nullable private Consumer snapshotValidator = null; private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); private final DataFileSet removedDataFiles = DataFileSet.create(); private boolean validateDeletes = false; @@ -86,6 +89,12 @@ public RowDelta validateFromSnapshot(long snapshotId) { return this; } + @Override + public RowDelta validateSnapshot(Consumer validator) { + this.snapshotValidator = validator; + return this; + } + @Override public RowDelta validateDeletedFiles() { this.validateDeletes = true; @@ -144,6 +153,10 @@ protected void validate(TableMetadata base, Snapshot parent) { parent); } + if (snapshotValidator != null) { + validateSnapshots(snapshotValidator, base, startingSnapshotId, parent); + } + if (validateDeletes) { failMissingDeletePaths(); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 9ed2f4f4c0fb..c8b776e93029 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.ValidationException; @@ -674,6 +676,27 @@ protected void validateDeletedDataFiles( } } + /** + * Validates parent snapshots with a user-provided function. + * + * @param validator the validation function + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param parent ending snapshot on the branch being validated + */ + protected void validateSnapshots( + Consumer validator, + TableMetadata base, + @Nullable Long startingSnapshotId, + @Nullable Snapshot parent) { + if (parent == null) { + return; + } + + SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId, base::snapshot) + .forEach(validator); + } + /** * Returns an iterable of files matching a filter have been added to the table since a starting * snapshot. diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 54d506b66328..1a2cdd76a1a9 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -28,6 +28,8 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; @@ -40,6 +42,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.flink.sink.DeltaManifests; import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; @@ -158,26 +161,36 @@ public void commit(Collection> commitRequests) private static long getMaxCommittedCheckpointId( Table table, String flinkJobId, String operatorId, String branch) { Snapshot snapshot = table.snapshot(branch); - long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; while (snapshot != null) { - Map summary = snapshot.summary(); - String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); - String snapshotOperatorId = summary.get(OPERATOR_ID); - if (flinkJobId.equals(snapshotFlinkJobId) - && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { - String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); - if (value != null) { - lastCommittedCheckpointId = Long.parseLong(value); - break; - } + @Nullable + Long committedCheckpointId = extractCommittedCheckpointId(snapshot, flinkJobId, operatorId); + if (committedCheckpointId != null) { + return committedCheckpointId; } Long parentSnapshotId = snapshot.parentId(); snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; } - return lastCommittedCheckpointId; + return INITIAL_CHECKPOINT_ID; + } + + @Nullable + private static Long extractCommittedCheckpointId( + Snapshot snapshot, String flinkJobId, String operatorId) { + Map summary = snapshot.summary(); + String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); + String snapshotOperatorId = summary.get(OPERATOR_ID); + if (flinkJobId.equals(snapshotFlinkJobId) + && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { + String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + if (value != null) { + return Long.parseLong(value); + } + } + + return null; } /** @@ -276,7 +289,17 @@ private void replacePartitions( String operatorId) { // Iceberg tables are unsorted. So the order of the append data does not matter. // Hence, we commit everything in one snapshot. - ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool); + long checkpointId = pendingResults.lastKey(); + ReplacePartitions dynamicOverwrite = + table + .newReplacePartitions() + .scanManifestsWith(workerPool) + .validateSnapshot( + new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId)); + @Nullable Snapshot latestSnapshot = table.snapshot(branch); + if (latestSnapshot != null) { + dynamicOverwrite = dynamicOverwrite.validateFromSnapshot(latestSnapshot.snapshotId()); + } for (List writeResults : pendingResults.values()) { for (WriteResult result : writeResults) { @@ -292,7 +315,7 @@ private void replacePartitions( "dynamic partition overwrite", newFlinkJobId, operatorId, - pendingResults.lastKey()); + checkpointId); } private void commitDeltaTxn( @@ -306,7 +329,17 @@ private void commitDeltaTxn( long checkpointId = e.getKey(); List writeResults = e.getValue(); - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + RowDelta rowDelta = + table + .newRowDelta() + .scanManifestsWith(workerPool) + .validateSnapshot( + new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId)); + @Nullable Snapshot latestSnapshot = table.snapshot(branch); + if (latestSnapshot != null) { + rowDelta = rowDelta.validateFromSnapshot(latestSnapshot.snapshotId()); + } + for (WriteResult result : writeResults) { // Row delta validations are not needed for streaming changes that write equality deletes. // Equality deletes are applied to data in all previous sequence numbers, so retries may @@ -329,6 +362,39 @@ private void commitDeltaTxn( } } + static class MaxCommittedCheckpointIdValidator implements Consumer { + private final long stagedCheckpointId; + private final String flinkJobId; + private final String flinkOperatorId; + + MaxCommittedCheckpointIdValidator( + long stagedCheckpointId, String flinkJobId, String flinkOperatorId) { + this.stagedCheckpointId = stagedCheckpointId; + this.flinkJobId = flinkJobId; + this.flinkOperatorId = flinkOperatorId; + } + + @Override + public void accept(Snapshot snapshot) { + @Nullable + Long checkpointId = extractCommittedCheckpointId(snapshot, flinkJobId, flinkOperatorId); + if (checkpointId == null) { + return; + } + + ValidationException.check( + checkpointId < stagedCheckpointId, + "The new parent snapshot '%s' has '%s': '%s' >= '%s' of the currently staged committable." + + "\nThis can happen, for example, when using the REST catalog: if the previous commit request failed" + + " in the Flink client but succeeded on the server after the Flink job decided to retry it with the new request." + + "\nFlink should retry this exception, and the committer should skip the duplicate request during the next retry.", + snapshot.snapshotId(), + MAX_COMMITTED_CHECKPOINT_ID, + checkpointId, + stagedCheckpointId); + } + } + @VisibleForTesting void commitOperation( Table table, diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 7894428a781f..e4146502bf33 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -41,10 +41,12 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.io.WriteResult; @@ -56,6 +58,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicCommitter { @@ -532,11 +536,15 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - // Second fail during commit + // Second fail before table update assertThatThrownBy(commitExecutable); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); - // Third fail after commit + // Third fail after table update + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); + + // Fourth fail after commit assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); @@ -675,18 +683,106 @@ void testReplacePartitions() throws Exception { .build()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 1; + + WriteTarget writeTarget = + new WriteTarget(TABLE1, SnapshotRef.MAIN_BRANCH, 42, 0, false, Sets.newHashSet(1, 2)); + byte[] manifest = + aggregator.writeToManifest( + writeTarget, + Sets.newHashSet( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId)); + Collection> commitRequests = Sets.newHashSet(commitRequest1); + + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + + CommitHook commitHook = + new TestDynamicIcebergSink.DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Map.of(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics)); + + DynamicCommitter mainCommitter = + new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + assertThatThrownBy(() -> mainCommitter.commit(commitRequests)) + .isInstanceOf(ValidationException.class) + .hasMessageMatching( + String.format( + "(?s)The new parent snapshot '\\d+' has 'flink.max-committed-checkpoint-id': '%s' >= '%s' of the currently staged committable.*", + checkpointId, checkpointId)); + + // Only one commit should succeed + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId)) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + interface CommitHook extends Serializable { - void beforeCommit(); + default void beforeCommit(Collection> commitRequests) {} - void duringCommit(); + default void beforeCommitOperation() {} - void afterCommit(); + default void afterCommitOperation() {} + + default void afterCommit() {} } static class FailBeforeAndAfterCommit implements CommitHook { static boolean failedBeforeCommit; - static boolean failedDuringCommit; + static boolean failedBeforeCommitOperation; + static boolean failedAfterCommitOperation; static boolean failedAfterCommit; FailBeforeAndAfterCommit() { @@ -694,7 +790,7 @@ static class FailBeforeAndAfterCommit implements CommitHook { } @Override - public void beforeCommit() { + public void beforeCommit(Collection> ignored) { if (!failedBeforeCommit) { failedBeforeCommit = true; throw new RuntimeException("Failing before commit"); @@ -702,10 +798,18 @@ public void beforeCommit() { } @Override - public void duringCommit() { - if (!failedDuringCommit) { - failedDuringCommit = true; - throw new RuntimeException("Failing during commit"); + public void beforeCommitOperation() { + if (!failedBeforeCommitOperation) { + failedBeforeCommitOperation = true; + throw new RuntimeException("Failing before commit operation"); + } + } + + @Override + public void afterCommitOperation() { + if (!failedAfterCommitOperation) { + failedAfterCommitOperation = true; + throw new RuntimeException("Failing after commit operation"); } } @@ -719,7 +823,8 @@ public void afterCommit() { static void reset() { failedBeforeCommit = false; - failedDuringCommit = false; + failedBeforeCommitOperation = false; + failedAfterCommitOperation = false; failedAfterCommit = false; } } @@ -743,7 +848,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - commitHook.beforeCommit(); + commitHook.beforeCommit(commitRequests); super.commit(commitRequests); commitHook.afterCommit(); } @@ -758,9 +863,10 @@ void commitOperation( String newFlinkJobId, String operatorId, long checkpointId) { + commitHook.beforeCommitOperation(); super.commitOperation( table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); - commitHook.duringCommit(); + commitHook.afterCommitOperation(); } } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 20fae212b48e..f0eea33ab24a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -49,6 +51,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DistributionMode; @@ -79,6 +82,8 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { @@ -527,8 +532,8 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { // Configure a Restart strategy to allow recovery Configuration configuration = new Configuration(); configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); - // Allow max 3 retries to make up for the three failures we are simulating here - configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3); + // Allow max 4 retries to make up for the four failures we are simulating here + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 4); configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); env.configure(configuration); @@ -541,13 +546,15 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { final CommitHook commitHook = new FailBeforeAndAfterCommit(); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); executeDynamicSink(rows, env, true, 1, commitHook); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); } @@ -570,6 +577,89 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws Exception { + TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1"); + List records = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned())); + + CommitHook duplicateCommit = + new DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalogLoader().loadCatalog(), + Collections.emptyMap(), + overwriteMode, + 10, + "sinkId", + new DynamicCommitterMetrics(new UnregisteredMetricsGroup()))); + + executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode); + + Table table = CATALOG_EXTENSION.catalog().loadTable(tableId); + + if (!overwriteMode) { + verifyResults(records); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf(Map.of("total-records", String.valueOf(records.size()))); + } + + long totalAddedRecords = + Lists.newArrayList(table.snapshots()).stream() + .map(snapshot -> snapshot.summary().getOrDefault("added-records", "0")) + .mapToLong(Long::valueOf) + .sum(); + assertThat(totalAddedRecords).isEqualTo(records.size()); + } + + /** + * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen + * in production scenarios when using REST catalog. + */ + static class DuplicateCommitHook implements CommitHook { + // Static to maintain state after Flink restarts + private static boolean hasTriggered = false; + + private final SerializableSupplier duplicateCommitterSupplier; + private final List> commitRequests; + + DuplicateCommitHook(SerializableSupplier duplicateCommitterSupplier) { + this.duplicateCommitterSupplier = duplicateCommitterSupplier; + this.commitRequests = Lists.newArrayList(); + + resetState(); + } + + private static void resetState() { + hasTriggered = false; + } + + @Override + public void beforeCommit(Collection> requests) { + if (!hasTriggered) { + this.commitRequests.addAll(requests); + } + } + + @Override + public void beforeCommitOperation() { + if (!hasTriggered) { + try { + duplicateCommitterSupplier.get().commit(commitRequests); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException("Duplicate committer failed", e); + } + commitRequests.clear(); + hasTriggered = true; + } + } + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; @@ -579,10 +669,7 @@ private AppendRightBeforeCommit(String tableIdentifier) { } @Override - public void beforeCommit() {} - - @Override - public void duringCommit() { + public void beforeCommitOperation() { // Create a conflict Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); DataFile dataFile = @@ -593,9 +680,6 @@ public void duringCommit() { .build(); table.newAppend().appendFile(dataFile).commit(); } - - @Override - public void afterCommit() {} } private void runTest(List dynamicData) throws Exception { @@ -626,8 +710,19 @@ private void executeDynamicSink( int parallelism, @Nullable CommitHook commitHook) throws Exception { + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, commitHook, false); + } + + private void executeDynamicSink( + List dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism, + @Nullable CommitHook commitHook, + boolean overwrite) + throws Exception { DataStream dataStream = - env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); + env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {})); env.setParallelism(parallelism); if (commitHook != null) { @@ -638,6 +733,7 @@ private void executeDynamicSink( .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) .setSnapshotProperty("commit.retry.num-retries", "0") + .overwrite(overwrite) .append(); } else { DynamicIcebergSink.forInput(dataStream) @@ -645,6 +741,7 @@ private void executeDynamicSink( .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) + .overwrite(overwrite) .append(); } @@ -676,6 +773,7 @@ DynamicIcebergSink instantiateSink( static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { private final CommitHook commitHook; + private final boolean overwriteMode; CommitHookDynamicIcebergSink( CommitHook commitHook, @@ -693,6 +791,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { flinkWriteConf, cacheMaximumSize); this.commitHook = commitHook; + this.overwriteMode = flinkWriteConf.overwriteMode(); } @Override @@ -701,7 +800,7 @@ public Committer createCommitter(CommitterInitContext contex commitHook, CATALOG_EXTENSION.catalogLoader().loadCatalog(), Collections.emptyMap(), - false, + overwriteMode, 10, "sinkId", new DynamicCommitterMetrics(context.metricGroup()));