diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 892257b51b0c..abfb71ceac84 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -31,7 +31,7 @@ public class BaseReplacePartitions extends MergingSnapshotProducer { - - private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class); private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; private static final WriteResult EMPTY_WRITE_RESULT = @@ -79,13 +79,9 @@ class DynamicCommitter implements Committer { .addDeleteFiles(Lists.newArrayList()) .build(); - private static final long INITIAL_CHECKPOINT_ID = -1L; - @VisibleForTesting static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits"; - private static final String FLINK_JOB_ID = "flink.job-id"; - private static final String OPERATOR_ID = "flink.operator-id"; private final Map snapshotProperties; private final boolean replacePartitions; private final DynamicCommitterMetrics committerMetrics; @@ -138,7 +134,7 @@ public void commit(Collection> commitRequests) Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); long maxCommittedCheckpointId = - getMaxCommittedCheckpointId( + MaxCommittedCheckpointIdValidator.getMaxCommittedCheckpointId( table, last.jobId(), last.operatorId(), entry.getKey().branch()); // Mark the already committed FilesCommittable(s) as finished entry @@ -155,31 +151,6 @@ public void commit(Collection> commitRequests) } } - private static long getMaxCommittedCheckpointId( - Table table, String flinkJobId, String operatorId, String branch) { - Snapshot snapshot = table.snapshot(branch); - long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; - - while (snapshot != null) { - Map summary = snapshot.summary(); - String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); - String snapshotOperatorId = summary.get(OPERATOR_ID); - if (flinkJobId.equals(snapshotFlinkJobId) - && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { - String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); - if (value != null) { - lastCommittedCheckpointId = Long.parseLong(value); - break; - } - } - - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; - } - - return lastCommittedCheckpointId; - } - /** * Commits the data to the Iceberg table by reading the file data from the {@link DeltaManifests} * ordered by the checkpointId, and writing the new snapshot to the Iceberg table. The {@link @@ -274,9 +245,17 @@ private void replacePartitions( CommitSummary summary, String newFlinkJobId, String operatorId) { + long checkpointId = pendingResults.lastKey(); + // Iceberg tables are unsorted. So the order of the append data does not matter. // Hence, we commit everything in one snapshot. - ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool); + ReplacePartitions dynamicOverwrite = + new FlinkReplacePartitions( + fullTableName(table), + tableOperations(table), + new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId)) + .validateFromSnapshot(table.snapshot(branch)) + .scanManifestsWith(workerPool); for (List writeResults : pendingResults.values()) { for (WriteResult result : writeResults) { @@ -306,7 +285,14 @@ private void commitDeltaTxn( long checkpointId = e.getKey(); List writeResults = e.getValue(); - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + RowDelta rowDelta = + new FlinkRowDelta( + fullTableName(table), + tableOperations(table), + new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId)) + .validateFromSnapshot(table.snapshot(branch)) + .scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { // Row delta validations are not needed for streaming changes that write equality deletes. // Equality deletes are applied to data in all previous sequence numbers, so retries may @@ -350,9 +336,8 @@ void commitOperation( snapshotProperties.forEach(operation::set); // custom snapshot metadata properties will be overridden if they conflict with internal ones // used by the sink. - operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); - operation.set(FLINK_JOB_ID, newFlinkJobId); - operation.set(OPERATOR_ID, operatorId); + MaxCommittedCheckpointIdValidator.setFlinkProperties( + operation, checkpointId, newFlinkJobId, operatorId); operation.toBranch(branch); long startNano = System.nanoTime(); @@ -370,6 +355,19 @@ void commitOperation( } } + private String fullTableName(Table table) { + return CatalogUtil.fullTableName(catalog.name(), TableIdentifier.parse(table.name())); + } + + private static TableOperations tableOperations(Table table) { + if (table instanceof HasTableOperations) { + return ((HasTableOperations) table).operations(); + } + + throw new IllegalArgumentException( + "Catalog tables must implement: " + HasTableOperations.class.getSimpleName()); + } + @Override public void close() throws IOException { workerPool.shutdown(); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkReplacePartitions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkReplacePartitions.java new file mode 100644 index 000000000000..ee9ec8a2b2ab --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkReplacePartitions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.iceberg.BaseReplacePartitions; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; + +class FlinkReplacePartitions extends BaseReplacePartitions + implements FlinkSnapshotValidator { + private final Consumer snapshotValidator; + + private Long startingSnapshotId = null; // check all versions by default + + FlinkReplacePartitions( + String tableName, TableOperations ops, Consumer snapshotValidator) { + super(tableName, ops); + this.snapshotValidator = snapshotValidator; + } + + @Override + public void validateSnapshot(Snapshot snapshot) { + snapshotValidator.accept(snapshot); + } + + @Nullable + @Override + public Long startingSnapshotId() { + return startingSnapshotId; + } + + @Override + public void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + validateSnapshots(base, parent); + } + + FlinkReplacePartitions validateFromSnapshot(@Nullable Snapshot snapshot) { + if (snapshot != null) { + super.validateFromSnapshot(snapshot.snapshotId()); + startingSnapshotId = snapshot.snapshotId(); + } + + return this; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkRowDelta.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkRowDelta.java new file mode 100644 index 000000000000..a0d95a7a4347 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkRowDelta.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.iceberg.BaseRowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; + +class FlinkRowDelta extends BaseRowDelta implements FlinkSnapshotValidator { + private final Consumer snapshotValidator; + + private Long startingSnapshotId = null; // check all versions by default + + FlinkRowDelta(String tableName, TableOperations ops, Consumer snapshotValidator) { + super(tableName, ops); + this.snapshotValidator = snapshotValidator; + } + + @Override + public void validateSnapshot(Snapshot snapshot) { + snapshotValidator.accept(snapshot); + } + + @Nullable + @Override + public Long startingSnapshotId() { + return startingSnapshotId; + } + + @Override + protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + validateSnapshots(base, parent); + } + + FlinkRowDelta validateFromSnapshot(@Nullable Snapshot snapshot) { + if (snapshot != null) { + super.validateFromSnapshot(snapshot.snapshotId()); + startingSnapshotId = snapshot.snapshotId(); + } + + return this; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkSnapshotValidator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkSnapshotValidator.java new file mode 100644 index 000000000000..76b71f804f26 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkSnapshotValidator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import javax.annotation.Nullable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.util.SnapshotUtil; + +interface FlinkSnapshotValidator { + void validateSnapshot(Snapshot snapshot); + + @Nullable + Long startingSnapshotId(); + + default void validateSnapshots(TableMetadata base, @Nullable Snapshot parent) { + if (parent == null) { + return; + } + + SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId(), base::snapshot) + .forEach(this::validateSnapshot); + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/MaxCommittedCheckpointIdValidator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/MaxCommittedCheckpointIdValidator.java new file mode 100644 index 000000000000..7ea092126cde --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/MaxCommittedCheckpointIdValidator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Map; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotUpdate; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; + +class MaxCommittedCheckpointIdValidator implements Consumer { + private static final String FLINK_JOB_ID = "flink.job-id"; + private static final String OPERATOR_ID = "flink.operator-id"; + private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + private static final long INITIAL_CHECKPOINT_ID = -1L; + + private final long stagedCheckpointId; + private final String flinkJobId; + private final String flinkOperatorId; + + MaxCommittedCheckpointIdValidator( + long stagedCheckpointId, String flinkJobId, String flinkOperatorId) { + this.stagedCheckpointId = stagedCheckpointId; + this.flinkJobId = flinkJobId; + this.flinkOperatorId = flinkOperatorId; + } + + @Override + public void accept(Snapshot snapshot) { + @Nullable + Long checkpointId = extractCommittedCheckpointId(snapshot, flinkJobId, flinkOperatorId); + if (checkpointId == null) { + return; + } + + ValidationException.check( + checkpointId < stagedCheckpointId, + "The new parent snapshot '%s' has '%s': '%s' >= '%s' of the currently staged committable." + + "\nThis can happen, for example, when using the REST catalog: if the previous commit request failed" + + " in the Flink client but succeeded on the server after the Flink job decided to retry it with the new request." + + "\nFlink should retry this exception, and the committer should skip the duplicate request during the next retry.", + snapshot.snapshotId(), + MAX_COMMITTED_CHECKPOINT_ID, + checkpointId, + stagedCheckpointId); + } + + /** TODO: Reuse {@link org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */ + static long getMaxCommittedCheckpointId( + Table table, String flinkJobId, String operatorId, String branch) { + Snapshot snapshot = table.snapshot(branch); + + while (snapshot != null) { + @Nullable + Long committedCheckpointId = extractCommittedCheckpointId(snapshot, flinkJobId, operatorId); + if (committedCheckpointId != null) { + return committedCheckpointId; + } + + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + + return INITIAL_CHECKPOINT_ID; + } + + @Nullable + static Long extractCommittedCheckpointId( + Snapshot snapshot, String flinkJobId, String operatorId) { + Map summary = snapshot.summary(); + String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); + String snapshotOperatorId = summary.get(OPERATOR_ID); + if (flinkJobId.equals(snapshotFlinkJobId) + && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { + String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + if (value != null) { + return Long.parseLong(value); + } + } + + return null; + } + + static void setFlinkProperties( + SnapshotUpdate operation, long checkpointId, String flinkJobId, String operatorId) { + operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); + operation.set(FLINK_JOB_ID, flinkJobId); + operation.set(OPERATOR_ID, operatorId); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 7894428a781f..e4146502bf33 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -41,10 +41,12 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.io.WriteResult; @@ -56,6 +58,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicCommitter { @@ -532,11 +536,15 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - // Second fail during commit + // Second fail before table update assertThatThrownBy(commitExecutable); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); - // Third fail after commit + // Third fail after table update + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); + + // Fourth fail after commit assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); @@ -675,18 +683,106 @@ void testReplacePartitions() throws Exception { .build()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 1; + + WriteTarget writeTarget = + new WriteTarget(TABLE1, SnapshotRef.MAIN_BRANCH, 42, 0, false, Sets.newHashSet(1, 2)); + byte[] manifest = + aggregator.writeToManifest( + writeTarget, + Sets.newHashSet( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId)); + Collection> commitRequests = Sets.newHashSet(commitRequest1); + + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + + CommitHook commitHook = + new TestDynamicIcebergSink.DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Map.of(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics)); + + DynamicCommitter mainCommitter = + new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + assertThatThrownBy(() -> mainCommitter.commit(commitRequests)) + .isInstanceOf(ValidationException.class) + .hasMessageMatching( + String.format( + "(?s)The new parent snapshot '\\d+' has 'flink.max-committed-checkpoint-id': '%s' >= '%s' of the currently staged committable.*", + checkpointId, checkpointId)); + + // Only one commit should succeed + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId)) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + interface CommitHook extends Serializable { - void beforeCommit(); + default void beforeCommit(Collection> commitRequests) {} - void duringCommit(); + default void beforeCommitOperation() {} - void afterCommit(); + default void afterCommitOperation() {} + + default void afterCommit() {} } static class FailBeforeAndAfterCommit implements CommitHook { static boolean failedBeforeCommit; - static boolean failedDuringCommit; + static boolean failedBeforeCommitOperation; + static boolean failedAfterCommitOperation; static boolean failedAfterCommit; FailBeforeAndAfterCommit() { @@ -694,7 +790,7 @@ static class FailBeforeAndAfterCommit implements CommitHook { } @Override - public void beforeCommit() { + public void beforeCommit(Collection> ignored) { if (!failedBeforeCommit) { failedBeforeCommit = true; throw new RuntimeException("Failing before commit"); @@ -702,10 +798,18 @@ public void beforeCommit() { } @Override - public void duringCommit() { - if (!failedDuringCommit) { - failedDuringCommit = true; - throw new RuntimeException("Failing during commit"); + public void beforeCommitOperation() { + if (!failedBeforeCommitOperation) { + failedBeforeCommitOperation = true; + throw new RuntimeException("Failing before commit operation"); + } + } + + @Override + public void afterCommitOperation() { + if (!failedAfterCommitOperation) { + failedAfterCommitOperation = true; + throw new RuntimeException("Failing after commit operation"); } } @@ -719,7 +823,8 @@ public void afterCommit() { static void reset() { failedBeforeCommit = false; - failedDuringCommit = false; + failedBeforeCommitOperation = false; + failedAfterCommitOperation = false; failedAfterCommit = false; } } @@ -743,7 +848,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - commitHook.beforeCommit(); + commitHook.beforeCommit(commitRequests); super.commit(commitRequests); commitHook.afterCommit(); } @@ -758,9 +863,10 @@ void commitOperation( String newFlinkJobId, String operatorId, long checkpointId) { + commitHook.beforeCommitOperation(); super.commitOperation( table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); - commitHook.duringCommit(); + commitHook.afterCommitOperation(); } } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 20fae212b48e..f0eea33ab24a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -49,6 +51,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DistributionMode; @@ -79,6 +82,8 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { @@ -527,8 +532,8 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { // Configure a Restart strategy to allow recovery Configuration configuration = new Configuration(); configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); - // Allow max 3 retries to make up for the three failures we are simulating here - configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3); + // Allow max 4 retries to make up for the four failures we are simulating here + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 4); configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); env.configure(configuration); @@ -541,13 +546,15 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { final CommitHook commitHook = new FailBeforeAndAfterCommit(); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); executeDynamicSink(rows, env, true, 1, commitHook); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); } @@ -570,6 +577,89 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws Exception { + TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1"); + List records = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned())); + + CommitHook duplicateCommit = + new DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalogLoader().loadCatalog(), + Collections.emptyMap(), + overwriteMode, + 10, + "sinkId", + new DynamicCommitterMetrics(new UnregisteredMetricsGroup()))); + + executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode); + + Table table = CATALOG_EXTENSION.catalog().loadTable(tableId); + + if (!overwriteMode) { + verifyResults(records); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf(Map.of("total-records", String.valueOf(records.size()))); + } + + long totalAddedRecords = + Lists.newArrayList(table.snapshots()).stream() + .map(snapshot -> snapshot.summary().getOrDefault("added-records", "0")) + .mapToLong(Long::valueOf) + .sum(); + assertThat(totalAddedRecords).isEqualTo(records.size()); + } + + /** + * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen + * in production scenarios when using REST catalog. + */ + static class DuplicateCommitHook implements CommitHook { + // Static to maintain state after Flink restarts + private static boolean hasTriggered = false; + + private final SerializableSupplier duplicateCommitterSupplier; + private final List> commitRequests; + + DuplicateCommitHook(SerializableSupplier duplicateCommitterSupplier) { + this.duplicateCommitterSupplier = duplicateCommitterSupplier; + this.commitRequests = Lists.newArrayList(); + + resetState(); + } + + private static void resetState() { + hasTriggered = false; + } + + @Override + public void beforeCommit(Collection> requests) { + if (!hasTriggered) { + this.commitRequests.addAll(requests); + } + } + + @Override + public void beforeCommitOperation() { + if (!hasTriggered) { + try { + duplicateCommitterSupplier.get().commit(commitRequests); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException("Duplicate committer failed", e); + } + commitRequests.clear(); + hasTriggered = true; + } + } + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; @@ -579,10 +669,7 @@ private AppendRightBeforeCommit(String tableIdentifier) { } @Override - public void beforeCommit() {} - - @Override - public void duringCommit() { + public void beforeCommitOperation() { // Create a conflict Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); DataFile dataFile = @@ -593,9 +680,6 @@ public void duringCommit() { .build(); table.newAppend().appendFile(dataFile).commit(); } - - @Override - public void afterCommit() {} } private void runTest(List dynamicData) throws Exception { @@ -626,8 +710,19 @@ private void executeDynamicSink( int parallelism, @Nullable CommitHook commitHook) throws Exception { + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, commitHook, false); + } + + private void executeDynamicSink( + List dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism, + @Nullable CommitHook commitHook, + boolean overwrite) + throws Exception { DataStream dataStream = - env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); + env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {})); env.setParallelism(parallelism); if (commitHook != null) { @@ -638,6 +733,7 @@ private void executeDynamicSink( .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) .setSnapshotProperty("commit.retry.num-retries", "0") + .overwrite(overwrite) .append(); } else { DynamicIcebergSink.forInput(dataStream) @@ -645,6 +741,7 @@ private void executeDynamicSink( .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) + .overwrite(overwrite) .append(); } @@ -676,6 +773,7 @@ DynamicIcebergSink instantiateSink( static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { private final CommitHook commitHook; + private final boolean overwriteMode; CommitHookDynamicIcebergSink( CommitHook commitHook, @@ -693,6 +791,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { flinkWriteConf, cacheMaximumSize); this.commitHook = commitHook; + this.overwriteMode = flinkWriteConf.overwriteMode(); } @Override @@ -701,7 +800,7 @@ public Committer createCommitter(CommitterInitContext contex commitHook, CATALOG_EXTENSION.catalogLoader().loadCatalog(), Collections.emptyMap(), - false, + overwriteMode, 10, "sinkId", new DynamicCommitterMetrics(context.metricGroup()));