From 7b5b0da7101b45d6509cfbf825d04ec7cc5dba2c Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 18 Sep 2025 13:28:46 +0200 Subject: [PATCH] [FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since streaming pipelines can continue to checkpoint even after their respective operators have been shut down, it is not safe to use a constant as this can lead to duplicate commits. However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should suffice in this scenario. Any pending committables should be processed by the ComitterOperator when the operator shuts down. No further checkpoints will take place. There are various connectors which rely on this behavior. I don't see any drawbacks from keeping this behavior for batch pipelines. --- .../operators/sink/CommitterOperator.java | 2 +- .../sink/SinkV2CommitterOperatorTest.java | 45 +++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 4a640fedf7598..7fecc8abffeee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -151,7 +151,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { public void endInput() throws Exception { if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); + commitAndEmitCheckpoints(Long.MAX_VALUE); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 03a0c791b3a13..0025acdf2f369 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -36,11 +36,14 @@ import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; import java.util.function.IntSupplier; +import java.util.stream.Stream; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; @@ -85,9 +88,17 @@ SinkAndCounters sinkWithoutPostCommit() { () -> committer.successfulCommits); } + static Stream testParameters() { + return Stream.of( + Arguments.of(true, false), + Arguments.of(true, true), + Arguments.of(false, false), + Arguments.of(false, true)); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitCommittables(boolean withPostCommitTopology) throws Exception { + @MethodSource("testParameters") + void testEmitCommittables(boolean withPostCommitTopology, boolean isBatch) throws Exception { SinkAndCounters sinkAndCounters; if (withPostCommitTopology) { // Insert global committer to simulate post commit topology @@ -99,7 +110,8 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { CommittableMessage, CommittableMessage> testHarness = new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); + new CommitterOperatorFactory<>( + sinkAndCounters.sink, isBatch, true)); testHarness.open(); final CommittableSummary committableSummary = @@ -127,6 +139,33 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { testHarness.close(); } + @Test + void testEmitCommittablesBatch() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithoutPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false)); + testHarness.open(); + + // Test that all committables up to Long.MAX_VALUE are committed. + long checkpointId = Long.MAX_VALUE; + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", checkpointId, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.close(); + } + @Test void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { SinkAndCounters sinkAndCounters = sinkWithPostCommit();