diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 6954ad24e36f2..2f766a341cbb0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -148,7 +148,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { public void endInput() throws Exception { if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); + commitAndEmitCheckpoints(Long.MAX_VALUE); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index c8b37943846af..ee58c8b94acb7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -276,6 +276,33 @@ void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Ex assertThat(testHarness.getOutput()).hasSize(2); } + @Test + void testEmitCommittablesBatch() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithoutPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false)); + testHarness.open(); + + // Test that all committables up to Long.MAX_VALUE are committed. + long checkpointId = Long.MAX_VALUE; + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, checkpointId, 1, 0, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", checkpointId, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.close(); + } + private OneInputStreamOperatorTestHarness< CommittableMessage, CommittableMessage> createTestHarness(