diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 7894428a781f..218cc406a733 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -591,6 +591,71 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { .build()); } + @Test + void testCommitDeltaTxnWithAppendFiles() throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + WriteTarget writeTarget1 = + new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); + WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); + + WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 1; + + byte[] deltaManifest1 = + aggregator.writeToManifest( + writeTarget1, + Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), + checkpointId); + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + + byte[] deltaManifest2 = + aggregator.writeToManifest( + writeTarget2, + Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), + checkpointId); + + CommitRequest commitRequest2 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter dynamicCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2)); + + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + + Snapshot snapshot = Iterables.getFirst(table.snapshots(), null); + assertThat(snapshot.operation()).isEqualTo("append"); + } + @Test void testReplacePartitions() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));