diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index d5774b66af81..61b20cb27b4b 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -37,12 +37,14 @@ import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotAncestryValidator; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.flink.sink.DeltaManifests; import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; @@ -55,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,9 +144,13 @@ public void commit(Collection> commitRequests) commitRequestMap.entrySet()) { Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); + Snapshot latestSnapshot = table.snapshot(entry.getKey().branch()); + Iterable ancestors = + latestSnapshot != null + ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot) + : List.of(); long maxCommittedCheckpointId = - getMaxCommittedCheckpointId( - table, last.jobId(), last.operatorId(), entry.getKey().branch()); + getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId()); // Mark the already committed FilesCommittable(s) as finished entry .getValue() @@ -160,12 +167,11 @@ public void commit(Collection> commitRequests) } private static long getMaxCommittedCheckpointId( - Table table, String flinkJobId, String operatorId, String branch) { - Snapshot snapshot = table.snapshot(branch); + Iterable ancestors, String flinkJobId, String operatorId) { long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; - while (snapshot != null) { - Map summary = snapshot.summary(); + for (Snapshot ancestor : ancestors) { + Map summary = ancestor.summary(); String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); String snapshotOperatorId = summary.get(OPERATOR_ID); if (flinkJobId.equals(snapshotFlinkJobId) @@ -176,9 +182,6 @@ private static long getMaxCommittedCheckpointId( break; } } - - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; } return lastCommittedCheckpointId; @@ -347,6 +350,36 @@ private void commitDeltaTxn( } } + private static class MaxCommittedCheckpointMismatchException extends ValidationException { + private MaxCommittedCheckpointMismatchException() { + super("Table already contains staged changes."); + } + } + + private static class MaxCommittedCheckpointIdValidator implements SnapshotAncestryValidator { + private final long stagedCheckpointId; + private final String flinkJobId; + private final String flinkOperatorId; + + private MaxCommittedCheckpointIdValidator( + long stagedCheckpointId, String flinkJobId, String flinkOperatorId) { + this.stagedCheckpointId = stagedCheckpointId; + this.flinkJobId = flinkJobId; + this.flinkOperatorId = flinkOperatorId; + } + + @Override + public Boolean apply(Iterable baseSnapshots) { + long maxCommittedCheckpointId = + getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, flinkOperatorId); + if (maxCommittedCheckpointId >= stagedCheckpointId) { + throw new MaxCommittedCheckpointMismatchException(); + } + + return true; + } + } + @VisibleForTesting void commitOperation( Table table, @@ -372,9 +405,25 @@ void commitOperation( operation.set(FLINK_JOB_ID, newFlinkJobId); operation.set(OPERATOR_ID, operatorId); operation.toBranch(branch); + operation.validateWith( + new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId)); long startNano = System.nanoTime(); - operation.commit(); // abort is automatically called if this fails. + try { + operation.commit(); // abort is automatically called if this fails. + } catch (MaxCommittedCheckpointMismatchException e) { + LOG.info( + "Skipping commit operation {} because the {} branch of the {} table already contains changes for checkpoint {}." + + " This can occur when a failure prevents the committer from receiving confirmation of a" + + " successful commit, causing the Flink job to retry committing the same set of changes.", + description, + branch, + table.name(), + checkpointId, + e); + return; + } + long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano); LOG.info( "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index d2c688e28c2a..1497458e6083 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -41,6 +41,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -59,6 +60,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicCommitter { @@ -669,11 +672,15 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - // Second fail during commit + // Second fail before table update assertThatThrownBy(commitExecutable); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); - // Third fail after commit + // Third fail after table update + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); + + // Fourth fail after commit assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); @@ -812,18 +819,101 @@ void testReplacePartitions() throws Exception { .build()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 1; + final String branch = SnapshotRef.MAIN_BRANCH; + + WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, Sets.newHashSet(1, 2)); + byte[] manifest = + aggregator.writeToManifest( + writeTarget, + Sets.newHashSet( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId)); + Collection> commitRequests = Sets.newHashSet(commitRequest1); + + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + + CommitHook commitHook = + new TestDynamicIcebergSink.DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Map.of(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics)); + + DynamicCommitter mainCommitter = + new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + mainCommitter.commit(commitRequests); + + // Only one commit should succeed + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId)) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + interface CommitHook extends Serializable { - void beforeCommit(); + default void beforeCommit(Collection> commitRequests) {} - void duringCommit(); + default void beforeCommitOperation() {} - void afterCommit(); + default void afterCommitOperation() {} + + default void afterCommit() {} } static class FailBeforeAndAfterCommit implements CommitHook { static boolean failedBeforeCommit; - static boolean failedDuringCommit; + static boolean failedBeforeCommitOperation; + static boolean failedAfterCommitOperation; static boolean failedAfterCommit; FailBeforeAndAfterCommit() { @@ -831,7 +921,7 @@ static class FailBeforeAndAfterCommit implements CommitHook { } @Override - public void beforeCommit() { + public void beforeCommit(Collection> ignored) { if (!failedBeforeCommit) { failedBeforeCommit = true; throw new RuntimeException("Failing before commit"); @@ -839,10 +929,18 @@ public void beforeCommit() { } @Override - public void duringCommit() { - if (!failedDuringCommit) { - failedDuringCommit = true; - throw new RuntimeException("Failing during commit"); + public void beforeCommitOperation() { + if (!failedBeforeCommitOperation) { + failedBeforeCommitOperation = true; + throw new RuntimeException("Failing before commit operation"); + } + } + + @Override + public void afterCommitOperation() { + if (!failedAfterCommitOperation) { + failedAfterCommitOperation = true; + throw new RuntimeException("Failing after commit operation"); } } @@ -856,7 +954,8 @@ public void afterCommit() { static void reset() { failedBeforeCommit = false; - failedDuringCommit = false; + failedBeforeCommitOperation = false; + failedAfterCommitOperation = false; failedAfterCommit = false; } } @@ -880,7 +979,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - commitHook.beforeCommit(); + commitHook.beforeCommit(commitRequests); super.commit(commitRequests); commitHook.afterCommit(); } @@ -895,9 +994,10 @@ void commitOperation( String newFlinkJobId, String operatorId, long checkpointId) { + commitHook.beforeCommitOperation(); super.commitOperation( table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); - commitHook.duringCommit(); + commitHook.afterCommitOperation(); } } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index f0cc46df469e..b660d8e285d9 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -49,6 +51,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DistributionMode; @@ -81,6 +84,8 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { @@ -732,8 +737,8 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { // Configure a Restart strategy to allow recovery Configuration configuration = new Configuration(); configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); - // Allow max 3 retries to make up for the three failures we are simulating here - configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3); + // Allow max 4 retries to make up for the four failures we are simulating here + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 4); configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); env.configure(configuration); @@ -746,13 +751,15 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { final CommitHook commitHook = new FailBeforeAndAfterCommit(); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); executeDynamicSink(rows, env, true, 1, commitHook); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); } @@ -775,6 +782,90 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws Exception { + TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1"); + List records = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned())); + + CommitHook duplicateCommit = + new DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalogLoader().loadCatalog(), + Collections.emptyMap(), + overwriteMode, + 10, + "sinkId", + new DynamicCommitterMetrics(new UnregisteredMetricsGroup()))); + + executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode); + + Table table = CATALOG_EXTENSION.catalog().loadTable(tableId); + + if (!overwriteMode) { + verifyResults(records); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf(Map.of("total-records", String.valueOf(records.size()))); + } + + long totalAddedRecords = + Lists.newArrayList(table.snapshots()).stream() + .map(snapshot -> snapshot.summary().getOrDefault("added-records", "0")) + .mapToLong(Long::valueOf) + .sum(); + assertThat(totalAddedRecords).isEqualTo(records.size()); + } + + /** + * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen + * in production scenarios when using REST catalog. + */ + static class DuplicateCommitHook implements CommitHook { + // Static to maintain state after Flink restarts + private static boolean hasTriggered = false; + + private final SerializableSupplier duplicateCommitterSupplier; + private final List> commitRequests; + + DuplicateCommitHook(SerializableSupplier duplicateCommitterSupplier) { + this.duplicateCommitterSupplier = duplicateCommitterSupplier; + this.commitRequests = Lists.newArrayList(); + + resetState(); + } + + private static void resetState() { + hasTriggered = false; + } + + @Override + public void beforeCommit(Collection> requests) { + if (!hasTriggered) { + this.commitRequests.addAll(requests); + } + } + + @Override + public void beforeCommitOperation() { + if (!hasTriggered) { + try { + duplicateCommitterSupplier.get().commit(commitRequests); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException("Duplicate committer failed", e); + } + + commitRequests.clear(); + hasTriggered = true; + } + } + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; @@ -784,10 +875,7 @@ private AppendRightBeforeCommit(String tableIdentifier) { } @Override - public void beforeCommit() {} - - @Override - public void duringCommit() { + public void beforeCommitOperation() { // Create a conflict Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); DataFile dataFile = @@ -798,9 +886,6 @@ public void duringCommit() { .build(); table.newAppend().appendFile(dataFile).commit(); } - - @Override - public void afterCommit() {} } private void runTest(List dynamicData) throws Exception { @@ -831,8 +916,19 @@ private void executeDynamicSink( int parallelism, @Nullable CommitHook commitHook) throws Exception { + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, commitHook, false); + } + + private void executeDynamicSink( + List dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism, + @Nullable CommitHook commitHook, + boolean overwrite) + throws Exception { DataStream dataStream = - env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); + env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {})); env.setParallelism(parallelism); if (commitHook != null) { @@ -843,6 +939,7 @@ private void executeDynamicSink( .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) .setSnapshotProperty("commit.retry.num-retries", "0") + .overwrite(overwrite) .append(); } else { DynamicIcebergSink.forInput(dataStream) @@ -850,6 +947,7 @@ private void executeDynamicSink( .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) + .overwrite(overwrite) .append(); } @@ -881,6 +979,7 @@ DynamicIcebergSink instantiateSink( static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { private final CommitHook commitHook; + private final boolean overwriteMode; CommitHookDynamicIcebergSink( CommitHook commitHook, @@ -898,6 +997,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { flinkWriteConf, cacheMaximumSize); this.commitHook = commitHook; + this.overwriteMode = flinkWriteConf.overwriteMode(); } @Override @@ -906,7 +1006,7 @@ public Committer createCommitter(CommitterInitContext contex commitHook, CATALOG_EXTENSION.catalogLoader().loadCatalog(), Collections.emptyMap(), - false, + overwriteMode, 10, "sinkId", new DynamicCommitterMetrics(context.metricGroup())); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index d5774b66af81..61b20cb27b4b 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -37,12 +37,14 @@ import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotAncestryValidator; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.flink.sink.DeltaManifests; import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; @@ -55,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,9 +144,13 @@ public void commit(Collection> commitRequests) commitRequestMap.entrySet()) { Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); + Snapshot latestSnapshot = table.snapshot(entry.getKey().branch()); + Iterable ancestors = + latestSnapshot != null + ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot) + : List.of(); long maxCommittedCheckpointId = - getMaxCommittedCheckpointId( - table, last.jobId(), last.operatorId(), entry.getKey().branch()); + getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId()); // Mark the already committed FilesCommittable(s) as finished entry .getValue() @@ -160,12 +167,11 @@ public void commit(Collection> commitRequests) } private static long getMaxCommittedCheckpointId( - Table table, String flinkJobId, String operatorId, String branch) { - Snapshot snapshot = table.snapshot(branch); + Iterable ancestors, String flinkJobId, String operatorId) { long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; - while (snapshot != null) { - Map summary = snapshot.summary(); + for (Snapshot ancestor : ancestors) { + Map summary = ancestor.summary(); String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); String snapshotOperatorId = summary.get(OPERATOR_ID); if (flinkJobId.equals(snapshotFlinkJobId) @@ -176,9 +182,6 @@ private static long getMaxCommittedCheckpointId( break; } } - - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; } return lastCommittedCheckpointId; @@ -347,6 +350,36 @@ private void commitDeltaTxn( } } + private static class MaxCommittedCheckpointMismatchException extends ValidationException { + private MaxCommittedCheckpointMismatchException() { + super("Table already contains staged changes."); + } + } + + private static class MaxCommittedCheckpointIdValidator implements SnapshotAncestryValidator { + private final long stagedCheckpointId; + private final String flinkJobId; + private final String flinkOperatorId; + + private MaxCommittedCheckpointIdValidator( + long stagedCheckpointId, String flinkJobId, String flinkOperatorId) { + this.stagedCheckpointId = stagedCheckpointId; + this.flinkJobId = flinkJobId; + this.flinkOperatorId = flinkOperatorId; + } + + @Override + public Boolean apply(Iterable baseSnapshots) { + long maxCommittedCheckpointId = + getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, flinkOperatorId); + if (maxCommittedCheckpointId >= stagedCheckpointId) { + throw new MaxCommittedCheckpointMismatchException(); + } + + return true; + } + } + @VisibleForTesting void commitOperation( Table table, @@ -372,9 +405,25 @@ void commitOperation( operation.set(FLINK_JOB_ID, newFlinkJobId); operation.set(OPERATOR_ID, operatorId); operation.toBranch(branch); + operation.validateWith( + new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId)); long startNano = System.nanoTime(); - operation.commit(); // abort is automatically called if this fails. + try { + operation.commit(); // abort is automatically called if this fails. + } catch (MaxCommittedCheckpointMismatchException e) { + LOG.info( + "Skipping commit operation {} because the {} branch of the {} table already contains changes for checkpoint {}." + + " This can occur when a failure prevents the committer from receiving confirmation of a" + + " successful commit, causing the Flink job to retry committing the same set of changes.", + description, + branch, + table.name(), + checkpointId, + e); + return; + } + long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano); LOG.info( "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 16c0cb8e6c73..5f938d4e8827 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -41,6 +41,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -59,6 +60,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicCommitter { @@ -669,11 +672,15 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - // Second fail during commit + // Second fail before table update assertThatThrownBy(commitExecutable); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); - // Third fail after commit + // Third fail after table update + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); + + // Fourth fail after commit assertThatThrownBy(commitExecutable); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); @@ -877,18 +884,101 @@ void testReplacePartitions() throws Exception { .build()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 1; + final String branch = SnapshotRef.MAIN_BRANCH; + + WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, Sets.newHashSet(1, 2)); + byte[] manifest = + aggregator.writeToManifest( + writeTarget, + Sets.newHashSet( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId)); + Collection> commitRequests = Sets.newHashSet(commitRequest1); + + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + + CommitHook commitHook = + new TestDynamicIcebergSink.DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Map.of(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics)); + + DynamicCommitter mainCommitter = + new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + mainCommitter.commit(commitRequests); + + // Only one commit should succeed + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId)) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + interface CommitHook extends Serializable { - void beforeCommit(); + default void beforeCommit(Collection> commitRequests) {} - void duringCommit(); + default void beforeCommitOperation() {} - void afterCommit(); + default void afterCommitOperation() {} + + default void afterCommit() {} } static class FailBeforeAndAfterCommit implements CommitHook { static boolean failedBeforeCommit; - static boolean failedDuringCommit; + static boolean failedBeforeCommitOperation; + static boolean failedAfterCommitOperation; static boolean failedAfterCommit; FailBeforeAndAfterCommit() { @@ -896,7 +986,7 @@ static class FailBeforeAndAfterCommit implements CommitHook { } @Override - public void beforeCommit() { + public void beforeCommit(Collection> ignored) { if (!failedBeforeCommit) { failedBeforeCommit = true; throw new RuntimeException("Failing before commit"); @@ -904,10 +994,18 @@ public void beforeCommit() { } @Override - public void duringCommit() { - if (!failedDuringCommit) { - failedDuringCommit = true; - throw new RuntimeException("Failing during commit"); + public void beforeCommitOperation() { + if (!failedBeforeCommitOperation) { + failedBeforeCommitOperation = true; + throw new RuntimeException("Failing before commit operation"); + } + } + + @Override + public void afterCommitOperation() { + if (!failedAfterCommitOperation) { + failedAfterCommitOperation = true; + throw new RuntimeException("Failing after commit operation"); } } @@ -921,7 +1019,8 @@ public void afterCommit() { static void reset() { failedBeforeCommit = false; - failedDuringCommit = false; + failedBeforeCommitOperation = false; + failedAfterCommitOperation = false; failedAfterCommit = false; } } @@ -945,7 +1044,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - commitHook.beforeCommit(); + commitHook.beforeCommit(commitRequests); super.commit(commitRequests); commitHook.afterCommit(); } @@ -960,9 +1059,10 @@ void commitOperation( String newFlinkJobId, String operatorId, long checkpointId) { + commitHook.beforeCommitOperation(); super.commitOperation( table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); - commitHook.duringCommit(); + commitHook.afterCommitOperation(); } } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index f0cc46df469e..b660d8e285d9 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -49,6 +51,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DistributionMode; @@ -81,6 +84,8 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { @@ -732,8 +737,8 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { // Configure a Restart strategy to allow recovery Configuration configuration = new Configuration(); configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); - // Allow max 3 retries to make up for the three failures we are simulating here - configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3); + // Allow max 4 retries to make up for the four failures we are simulating here + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 4); configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); env.configure(configuration); @@ -746,13 +751,15 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { final CommitHook commitHook = new FailBeforeAndAfterCommit(); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); executeDynamicSink(rows, env, true, 1, commitHook); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); - assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); } @@ -775,6 +782,90 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws Exception { + TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1"); + List records = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableId.name(), "main", PartitionSpec.unpartitioned())); + + CommitHook duplicateCommit = + new DuplicateCommitHook( + () -> + new DynamicCommitter( + CATALOG_EXTENSION.catalogLoader().loadCatalog(), + Collections.emptyMap(), + overwriteMode, + 10, + "sinkId", + new DynamicCommitterMetrics(new UnregisteredMetricsGroup()))); + + executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode); + + Table table = CATALOG_EXTENSION.catalog().loadTable(tableId); + + if (!overwriteMode) { + verifyResults(records); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf(Map.of("total-records", String.valueOf(records.size()))); + } + + long totalAddedRecords = + Lists.newArrayList(table.snapshots()).stream() + .map(snapshot -> snapshot.summary().getOrDefault("added-records", "0")) + .mapToLong(Long::valueOf) + .sum(); + assertThat(totalAddedRecords).isEqualTo(records.size()); + } + + /** + * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen + * in production scenarios when using REST catalog. + */ + static class DuplicateCommitHook implements CommitHook { + // Static to maintain state after Flink restarts + private static boolean hasTriggered = false; + + private final SerializableSupplier duplicateCommitterSupplier; + private final List> commitRequests; + + DuplicateCommitHook(SerializableSupplier duplicateCommitterSupplier) { + this.duplicateCommitterSupplier = duplicateCommitterSupplier; + this.commitRequests = Lists.newArrayList(); + + resetState(); + } + + private static void resetState() { + hasTriggered = false; + } + + @Override + public void beforeCommit(Collection> requests) { + if (!hasTriggered) { + this.commitRequests.addAll(requests); + } + } + + @Override + public void beforeCommitOperation() { + if (!hasTriggered) { + try { + duplicateCommitterSupplier.get().commit(commitRequests); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException("Duplicate committer failed", e); + } + + commitRequests.clear(); + hasTriggered = true; + } + } + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; @@ -784,10 +875,7 @@ private AppendRightBeforeCommit(String tableIdentifier) { } @Override - public void beforeCommit() {} - - @Override - public void duringCommit() { + public void beforeCommitOperation() { // Create a conflict Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); DataFile dataFile = @@ -798,9 +886,6 @@ public void duringCommit() { .build(); table.newAppend().appendFile(dataFile).commit(); } - - @Override - public void afterCommit() {} } private void runTest(List dynamicData) throws Exception { @@ -831,8 +916,19 @@ private void executeDynamicSink( int parallelism, @Nullable CommitHook commitHook) throws Exception { + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, commitHook, false); + } + + private void executeDynamicSink( + List dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism, + @Nullable CommitHook commitHook, + boolean overwrite) + throws Exception { DataStream dataStream = - env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); + env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {})); env.setParallelism(parallelism); if (commitHook != null) { @@ -843,6 +939,7 @@ private void executeDynamicSink( .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) .setSnapshotProperty("commit.retry.num-retries", "0") + .overwrite(overwrite) .append(); } else { DynamicIcebergSink.forInput(dataStream) @@ -850,6 +947,7 @@ private void executeDynamicSink( .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) + .overwrite(overwrite) .append(); } @@ -881,6 +979,7 @@ DynamicIcebergSink instantiateSink( static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { private final CommitHook commitHook; + private final boolean overwriteMode; CommitHookDynamicIcebergSink( CommitHook commitHook, @@ -898,6 +997,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { flinkWriteConf, cacheMaximumSize); this.commitHook = commitHook; + this.overwriteMode = flinkWriteConf.overwriteMode(); } @Override @@ -906,7 +1006,7 @@ public Committer createCommitter(CommitterInitContext contex commitHook, CATALOG_EXTENSION.catalogLoader().loadCatalog(), Collections.emptyMap(), - false, + overwriteMode, 10, "sinkId", new DynamicCommitterMetrics(context.metricGroup()));