diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index e58066aac6ca..54d506b66328 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -274,26 +274,25 @@ private void replacePartitions( CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { - // We don't commit the merged result into a single transaction because for the sequential - // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied - // to data files from txn1. Committing the merged one will lead to the incorrect delete - // semantic. - for (WriteResult result : e.getValue()) { - ReplacePartitions dynamicOverwrite = - table.newReplacePartitions().scanManifestsWith(workerPool); + // Iceberg tables are unsorted. So the order of the append data does not matter. + // Hence, we commit everything in one snapshot. + ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool); + + for (List writeResults : pendingResults.values()) { + for (WriteResult result : writeResults) { Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); - commitOperation( - table, - branch, - dynamicOverwrite, - summary, - "dynamic partition overwrite", - newFlinkJobId, - operatorId, - e.getKey()); } } + + commitOperation( + table, + branch, + dynamicOverwrite, + summary, + "dynamic partition overwrite", + newFlinkJobId, + operatorId, + pendingResults.lastKey()); } private void commitDeltaTxn( @@ -304,11 +303,11 @@ private void commitDeltaTxn( String newFlinkJobId, String operatorId) { for (Map.Entry> e : pendingResults.entrySet()) { - // We don't commit the merged result into a single transaction because for the sequential - // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied - // to data files from txn1. Committing the merged one will lead to the incorrect delete - // semantic. - for (WriteResult result : e.getValue()) { + long checkpointId = e.getKey(); + List writeResults = e.getValue(); + + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + for (WriteResult result : writeResults) { // Row delta validations are not needed for streaming changes that write equality deletes. // Equality deletes are applied to data in all previous sequence numbers, so retries may // push deletes further in the future, but do not affect correctness. Position deletes @@ -316,13 +315,17 @@ private void commitDeltaTxn( // being added in this commit. There is no way for data files added along with the delete // files to be concurrently removed, so there is no need to validate the files referenced by // the position delete files that are being committed. - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey()); } + + // Every Flink checkpoint contains a set of independent changes which can be committed + // together. While it is technically feasible to combine append-only data across checkpoints, + // for the sake of simplicity, we do not implement this (premature) optimization. Multiple + // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint + // intervals or when concurrent checkpointing is enabled. + commitOperation( + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 99a546536208..f5387aee882a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -19,8 +19,13 @@ package org.apache.iceberg.flink.sink.dynamic; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; +import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Map; import org.apache.flink.api.common.JobID; import org.apache.flink.api.connector.sink2.Committer.CommitRequest; @@ -30,19 +35,24 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -74,6 +84,39 @@ class TestDynamicCommitter { )) .build(); + private static final DataFile DATA_FILE_2 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-2.parquet") + .withFileSizeInBytes(0) + .withMetrics( + new Metrics( + 24L, + null, // no column sizes + ImmutableMap.of(1, 3L), // value count + ImmutableMap.of(1, 0L), // null count + null, + ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds + ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds + )) + .build(); + + private static final DeleteFile DELETE_FILE = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-3.parquet") + .withFileSizeInBytes(0) + .withMetrics( + new Metrics( + 24L, + null, // no column sizes + ImmutableMap.of(1, 3L), // value count + ImmutableMap.of(1, 0L), // null count + null, + ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds + ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds + )) + .ofPositionDeletes() + .build(); + @BeforeEach void before() { catalog = CATALOG_EXTENSION.catalog(); @@ -162,60 +205,57 @@ void testCommit() throws Exception { Snapshot first = Iterables.getFirst(table1.snapshots(), null); assertThat(first.summary()) .containsAllEntriesOf( - (Map) - ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") - .put("changed-partition-count", "1") - .put("flink.job-id", jobId) - .put("flink.max-committed-checkpoint-id", "" + checkpointId) - .put("flink.operator-id", operatorId) - .put("total-data-files", "1") - .put("total-delete-files", "0") - .put("total-equality-deletes", "0") - .put("total-files-size", "0") - .put("total-position-deletes", "0") - .put("total-records", "42") - .build()); + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); Snapshot second = Iterables.get(table1.snapshots(), 1, null); assertThat(second.summary()) .containsAllEntriesOf( - (Map) - ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") - .put("changed-partition-count", "1") - .put("flink.job-id", jobId) - .put("flink.max-committed-checkpoint-id", "" + checkpointId) - .put("flink.operator-id", operatorId) - .put("total-data-files", "1") - .put("total-delete-files", "0") - .put("total-equality-deletes", "0") - .put("total-files-size", "0") - .put("total-position-deletes", "0") - .put("total-records", "42") - .build()); + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); table2.refresh(); assertThat(table2.snapshots()).hasSize(1); Snapshot third = Iterables.getFirst(table2.snapshots(), null); assertThat(third.summary()) .containsAllEntriesOf( - (Map) - ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") - .put("changed-partition-count", "1") - .put("flink.job-id", jobId) - .put("flink.max-committed-checkpoint-id", "" + checkpointId) - .put("flink.operator-id", operatorId) - .put("total-data-files", "1") - .put("total-delete-files", "0") - .put("total-equality-deletes", "0") - .put("total-files-size", "0") - .put("total-position-deletes", "0") - .put("total-records", "42") - .build()); + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); } @Test @@ -277,21 +317,276 @@ void testAlreadyCommitted() throws Exception { Snapshot first = Iterables.getFirst(table1.snapshots(), null); assertThat(first.summary()) .containsAllEntriesOf( - (Map) - ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") - .put("changed-partition-count", "1") - .put("flink.job-id", jobId) - .put("flink.max-committed-checkpoint-id", "" + checkpointId) - .put("flink.operator-id", operatorId) - .put("total-data-files", "1") - .put("total-delete-files", "0") - .put("total-equality-deletes", "0") - .put("total-files-size", "0") - .put("total-position-deletes", "0") - .put("total-records", "42") - .build()); + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + + @Test + void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + WriteTarget writeTarget1 = + new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); + // writeTarget2 has a different schema + WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); + // Different branch for writeTarget3 + WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, Sets.newHashSet()); + + WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId1 = 1; + final int checkpointId2 = 2; + + byte[] deltaManifest1 = + aggregator.writeToManifest( + writeTarget1, + Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), + checkpointId1); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); + + byte[] deltaManifest2 = + aggregator.writeToManifest( + writeTarget2, + Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), + checkpointId1); + + CommitRequest commitRequest2 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId1)); + + byte[] deltaManifest3 = + aggregator.writeToManifest( + writeTarget3, + Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult2)), + checkpointId2); + + CommitRequest commitRequest3 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter dynamicCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); + + table.refresh(); + // Two committables, one for each snapshot / table / branch. + assertThat(table.snapshots()).hasSize(2); + + Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); + assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId()); + assertThat(snapshot1.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "2") + .put("added-records", "66") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId1) + .put("flink.operator-id", operatorId) + .put("total-data-files", "2") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "66") + .build()); + + Snapshot snapshot2 = Iterables.get(table.snapshots(), 1); + assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId()); + assertThat(snapshot2.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "24") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId2) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "24") + .build()); + } + + @Test + void testTableBranchAtomicCommitWithFailures() throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet()); + // writeTarget2 has a different schema + WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); + WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); + + WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + WriteResult writeResult2 = WriteResult.builder().addDeleteFiles(DELETE_FILE).build(); + WriteResult writeResult3 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId1 = 1; + final int checkpointId2 = 2; + + byte[] deltaManifest1 = + aggregator.writeToManifest( + writeTarget1, + Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), + checkpointId1); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); + + byte[] deltaManifest2 = + aggregator.writeToManifest( + writeTarget2, + Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), + checkpointId2); + + CommitRequest commitRequest2 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId2)); + + byte[] deltaManifest3 = + aggregator.writeToManifest( + writeTarget3, + Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult3)), + checkpointId2); + + CommitRequest commitRequest3 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + + // Use special hook to fail during various states of the commit operation + CommitHook commitHook = new FailBeforeAndAfterCommit(); + DynamicCommitter dynamicCommitter = + new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + ThrowingCallable commitExecutable = + () -> + dynamicCommitter.commit( + Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); + + // First fail pre-commit + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); + + // Second fail during commit + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); + + // Third fail after commit + assertThatThrownBy(commitExecutable); + assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); + + // Finally commit must go through, although it is a NOOP because the third failure is directly + // after the commit finished. + try { + commitExecutable.call(); + } catch (Throwable e) { + fail("Should not have thrown an exception"); + } + + table.refresh(); + // Three committables, but only two snapshots! WriteResults from different checkpoints are not + // getting + // combined due to one writeResult2 containing a delete file. + assertThat(table.snapshots()).hasSize(2); + + Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); + assertThat(snapshot1.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId1) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + + Snapshot snapshot2 = Iterables.get(table.snapshots(), 1); + assertThat(snapshot2.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId2) + .put("flink.operator-id", operatorId) + .put("total-data-files", "2") + .put("total-delete-files", "1") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "24") + .put("total-records", "84") + .build()); } @Test @@ -361,21 +656,109 @@ void testReplacePartitions() throws Exception { Snapshot latestSnapshot = Iterables.getLast(table1.snapshots()); assertThat(latestSnapshot.summary()) .containsAllEntriesOf( - (Map) - ImmutableMap.builder() - .put("replace-partitions", "true") - .put("added-data-files", "1") - .put("added-records", "42") - .put("changed-partition-count", "1") - .put("flink.job-id", jobId) - .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId + 1)) - .put("flink.operator-id", operatorId) - .put("total-data-files", "1") - .put("total-delete-files", "0") - .put("total-equality-deletes", "0") - .put("total-files-size", "0") - .put("total-position-deletes", "0") - .put("total-records", "42") - .build()); + ImmutableMap.builder() + .put("replace-partitions", "true") + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId + 1)) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + + interface CommitHook extends Serializable { + void beforeCommit(); + + void duringCommit(); + + void afterCommit(); + } + + static class FailBeforeAndAfterCommit implements CommitHook { + + static boolean failedBeforeCommit; + static boolean failedDuringCommit; + static boolean failedAfterCommit; + + FailBeforeAndAfterCommit() { + reset(); + } + + @Override + public void beforeCommit() { + if (!failedBeforeCommit) { + failedBeforeCommit = true; + throw new RuntimeException("Failing before commit"); + } + } + + @Override + public void duringCommit() { + if (!failedDuringCommit) { + failedDuringCommit = true; + throw new RuntimeException("Failing during commit"); + } + } + + @Override + public void afterCommit() { + if (!failedAfterCommit) { + failedAfterCommit = true; + throw new RuntimeException("Failing before commit"); + } + } + + static void reset() { + failedBeforeCommit = false; + failedDuringCommit = false; + failedAfterCommit = false; + } + } + + static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { + private final CommitHook commitHook; + + CommitHookEnabledDynamicCommitter( + CommitHook commitHook, + Catalog catalog, + Map snapshotProperties, + boolean replacePartitions, + int workerPoolSize, + String sinkId, + DynamicCommitterMetrics committerMetrics) { + super( + catalog, snapshotProperties, replacePartitions, workerPoolSize, sinkId, committerMetrics); + this.commitHook = commitHook; + } + + @Override + public void commit(Collection> commitRequests) + throws IOException, InterruptedException { + commitHook.beforeCommit(); + super.commit(commitRequests); + commitHook.afterCommit(); + } + + @Override + void commitOperation( + Table table, + String branch, + SnapshotUpdate operation, + CommitSummary summary, + String description, + String newFlinkJobId, + String operatorId, + long checkpointId) { + super.commitOperation( + table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); + commitHook.duringCommit(); + } } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index b61e297cc140..20fae212b48e 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.Serializable; import java.time.Duration; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -56,7 +55,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -69,8 +67,9 @@ import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook; +import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit; import org.apache.iceberg.inmemory.InMemoryInputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -528,7 +527,8 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { // Configure a Restart strategy to allow recovery Configuration configuration = new Configuration(); configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); - configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 2); + // Allow max 3 retries to make up for the three failures we are simulating here + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3); configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); env.configure(configuration); @@ -539,14 +539,15 @@ void testCommitFailedBeforeOrAfterCommit() throws Exception { new DynamicIcebergDataImpl( SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); - FailBeforeAndAfterCommit.reset(); final CommitHook commitHook = new FailBeforeAndAfterCommit(); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); executeDynamicSink(rows, env, true, 1, commitHook); assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue(); assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); } @@ -569,44 +570,6 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } - interface CommitHook extends Serializable { - void beforeCommit(); - - void duringCommit(); - - void afterCommit(); - } - - private static class FailBeforeAndAfterCommit implements CommitHook { - - static boolean failedBeforeCommit; - static boolean failedAfterCommit; - - @Override - public void beforeCommit() { - if (!failedBeforeCommit) { - failedBeforeCommit = true; - throw new RuntimeException("Failing before commit"); - } - } - - @Override - public void duringCommit() {} - - @Override - public void afterCommit() { - if (!failedAfterCommit) { - failedAfterCommit = true; - throw new RuntimeException("Failing before commit"); - } - } - - static void reset() { - failedBeforeCommit = false; - failedAfterCommit = false; - } - } - private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; @@ -734,8 +697,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { @Override public Committer createCommitter(CommitterInitContext context) { - // return super.createCommitter(context); - return new CommitHookEnabledDynamicCommitter( + return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter( commitHook, CATALOG_EXTENSION.catalogLoader().loadCatalog(), Collections.emptyMap(), @@ -746,46 +708,6 @@ public Committer createCommitter(CommitterInitContext contex } } - static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { - private final CommitHook commitHook; - - CommitHookEnabledDynamicCommitter( - CommitHook commitHook, - Catalog catalog, - Map snapshotProperties, - boolean replacePartitions, - int workerPoolSize, - String sinkId, - DynamicCommitterMetrics committerMetrics) { - super( - catalog, snapshotProperties, replacePartitions, workerPoolSize, sinkId, committerMetrics); - this.commitHook = commitHook; - } - - @Override - public void commit(Collection> commitRequests) - throws IOException, InterruptedException { - commitHook.beforeCommit(); - super.commit(commitRequests); - commitHook.afterCommit(); - } - - @Override - void commitOperation( - Table table, - String branch, - SnapshotUpdate operation, - CommitSummary summary, - String description, - String newFlinkJobId, - String operatorId, - long checkpointId) { - commitHook.duringCommit(); - super.commitOperation( - table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); - } - } - private void verifyResults(List dynamicData) throws IOException { // Calculate the expected result Map, List> expectedData = Maps.newHashMap();