From de906fa45d421db36bc6418c67c92fbee9ee0146 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Wed, 9 Apr 2025 15:29:03 +0200 Subject: [PATCH 1/2] [FLINK-37605][runtime] Clarify contract of endInput (cherry picked from commit 3342c231a2877f48540322f577527ec8bb765b08) --- .../flink/streaming/api/operators/BoundedOneInput.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java index 5a745238d69ee..cd02789219324 100755 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java @@ -35,6 +35,12 @@ public interface BoundedOneInput { /** * It is notified that no more data will arrive from the input. * + *

Stateful operators need to be aware that a restart with rescaling may occur after + * receiving this notification. A changed source split assignment may imply that the same + * subtask of this operator that received endInput, has its state after endInput snapshotted, + * and will receive new data after restart. Hence, the state should not contain any finalization + * that would make it impossible to process new data. + * *

WARNING: It is not safe to use this method to commit any transactions or other side * effects! You can use this method to flush any buffered data that can later on be committed * e.g. in a {@link StreamOperator#notifyCheckpointComplete(long)}. From c12d73faffa497c5fa27debf1f5c855ef5b5fd88 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Thu, 3 Apr 2025 09:32:00 +0200 Subject: [PATCH 2/2] [FLINK-37605][runtime] Infer checkpoint id on endInput in sink So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times. With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is - higher than all checkpoint ids of the previous, successful checkpoints of this attempt - higher than the checkpoint id of the restored checkpoint - lower than any future checkpoint id. Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all. (cherry picked from commit 93025452714570a4d461519510375dd72af3a2c0) --- .../compactor/operator/CompactorOperator.java | 12 +++- .../connector/sink2/CommittableMessage.java | 6 +- .../operators/sink/CommitterOperator.java | 23 ++++---- .../operators/sink/SinkWriterOperator.java | 58 +++---------------- .../CheckpointCommittableManagerImpl.java | 1 + .../committables/CommittableCollector.java | 11 ---- .../sink2/GlobalCommitterOperatorTest.java | 33 ----------- .../sink/CommitterOperatorTestBase.java | 40 ------------- .../sink/SinkWriterOperatorTestBase.java | 40 ++++++++++++- .../CommittableCollectorTest.java | 20 ------- .../AbstractStreamOperatorTestHarness.java | 12 +++- 11 files changed, 81 insertions(+), 175 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java index 0cb573d466f69..69ee4b07914b1 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -106,6 +107,8 @@ public class CompactorOperator // submitted again while restoring private ListState>> remainingRequestsState; + private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; + public CompactorOperator( FileCompactStrategy strategy, SimpleVersionedSerializer committableSerializer, @@ -136,15 +139,16 @@ public void processElement(StreamRecord element) throws Except @Override public void endInput() throws Exception { // add collecting requests into the final snapshot - checkpointRequests.put(CommittableMessage.EOI, collectingRequests); + long checkpointId = lastKnownCheckpointId + 1; + checkpointRequests.put(checkpointId, collectingRequests); collectingRequests = new ArrayList<>(); // submit all requests and wait until they are done - submitUntil(CommittableMessage.EOI); + submitUntil(checkpointId); assert checkpointRequests.isEmpty(); getAllTasksFuture().join(); - emitCompacted(CommittableMessage.EOI); + emitCompacted(checkpointId); assert compactingRequests.isEmpty(); } @@ -222,6 +226,8 @@ private void submitUntil(long checkpointId) { } private void emitCompacted(long checkpointId) throws Exception { + lastKnownCheckpointId = checkpointId; + List compacted = new ArrayList<>(); Iterator>>> iter = compactingRequests.iterator(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java index 7db0c29ecc6bc..4a2049dbce869 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java @@ -28,8 +28,10 @@ public interface CommittableMessage { /** * Special value for checkpointId for the end of input in case of batch commit or final * checkpoint. + * + * @deprecated the special value is not used anymore at all (remove with Flink 2.2) */ - long EOI = Long.MAX_VALUE; + @Deprecated long EOI = Long.MAX_VALUE; /** The subtask that created this committable. */ int getSubtaskId(); @@ -49,6 +51,8 @@ default OptionalLong getCheckpointId() { /** * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch * commit. + * + * @deprecated the special value EOI is not used anymore */ long getCheckpointIdOrEOI(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 10ae86cf10de0..6954ad24e36f2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -51,7 +52,6 @@ import java.util.Collections; import java.util.OptionalLong; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,11 +76,9 @@ class CommitterOperator extends AbstractStreamOperator committer; private CommittableCollector committableCollector; - private long lastCompletedCheckpointId = -1; + private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; private int maxRetries; - private boolean endInput = false; - /** The operator's state descriptor. */ private static final ListStateDescriptor STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>( @@ -131,11 +129,11 @@ public void initializeState(StateInitializationContext context) throws Exception getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), metricGroup)); - if (context.isRestored()) { + if (checkpointId.isPresent()) { committableCollectorState.get().forEach(cc -> committableCollector.merge(cc)); lastCompletedCheckpointId = checkpointId.getAsLong(); // try to re-commit recovered transactions as quickly as possible - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId); } } @@ -148,24 +146,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void endInput() throws Exception { - endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId)); } - private void commitAndEmitCheckpoints() throws IOException, InterruptedException { - long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; + private void commitAndEmitCheckpoints(long checkpointId) + throws IOException, InterruptedException { + lastCompletedCheckpointId = checkpointId; for (CheckpointCommittableManager checkpointManager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { // ensure that all committables of the first checkpoint are fully committed before // attempting the next committable commitAndEmit(checkpointManager); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 7fb78f37c0d81..31397f48b3705 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -52,8 +51,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; - import javax.annotation.Nullable; import java.io.IOException; @@ -62,6 +59,7 @@ import java.util.List; import java.util.OptionalLong; +import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -91,13 +89,6 @@ class SinkWriterOperator extends AbstractStreamOperator committableSerializer; private final List legacyCommittables = new ArrayList<>(); - /** - * Used to remember that EOI has already happened so that we don't emit the last committables of - * the final checkpoints twice. - */ - private static final ListStateDescriptor END_OF_INPUT_STATE_DESC = - new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); - /** The runtime information of the input element. */ private final Context context; @@ -115,10 +106,7 @@ class SinkWriterOperator extends AbstractStreamOperator endOfInputState; + private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1; SinkWriterOperator( Sink sink, @@ -146,8 +134,10 @@ class SinkWriterOperator extends AbstractStreamOperator> legacyCommitterState = new SimpleVersionedListState<>( @@ -161,41 +151,12 @@ public void initializeState(StateInitializationContext context) throws Exception } sinkWriter = writerStateHandler.createWriter(initContext, context); - - if (emitDownstream) { - // Figure out if we have seen end of input before and if we can suppress creating - // transactions and sending them downstream to the CommitterOperator. We have the - // following - // cases: - // 1. state is empty: - // - First time initialization - // - Restoring from a previous version of Flink that didn't handle EOI - // - Upscaled from a final or regular checkpoint - // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries - // that the CommitterOperator needs to handle. - // 2. state is not empty: - // - This implies Flink restores from a version that handles EOI. - // - If there is one entry, no rescaling happened (for this subtask), so if it's true, - // we recover from a final checkpoint (for this subtask) and can ignore another EOI - // else we have a regular checkpoint. - // - If there are multiple entries, Flink downscaled, and we need to check if all are - // true and do the same as above. As soon as one entry is false, we regularly start - // the writer and potentially emit duplicate summaries if we indeed recovered from a - // final checkpoint. - endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); - ArrayList previousState = Lists.newArrayList(endOfInputState.get()); - endOfInput = !previousState.isEmpty() && !previousState.contains(false); - } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); - if (endOfInputState != null) { - endOfInputState.clear(); - endOfInputState.add(this.endOfInput); - } } @Override @@ -225,17 +186,16 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void endInput() throws Exception { + LOG.info("Received endInput"); if (!endOfInput) { endOfInput = true; - if (endOfInputState != null) { - endOfInputState.add(true); - } sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + emitCommittables(lastKnownCheckpointId + 1); } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { + lastKnownCheckpointId = checkpointId; if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index da4491cda617d..816bd55543e51 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -95,6 +95,7 @@ void addSummary(CommittableSummary summary) { summary.getSubtaskId(), checkpointId, metricGroup); + // Remove branch once CommittableMessage.EOI has been removed (earliest 2.2) if (checkpointId == CommittableMessage.EOI) { SubtaskCommittableManager merged = subtasksCommittableManagers.merge( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 4e49d73279e48..96585a632d107 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -33,7 +33,6 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Objects; -import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; @@ -49,7 +48,6 @@ */ @Internal public class CommittableCollector { - private static final long EOI = Long.MAX_VALUE; /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */ private final NavigableMap> checkpointCommittables; @@ -143,15 +141,6 @@ public Collection> getCheckpointCo return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values()); } - /** - * Returns {@link CheckpointCommittableManager} belonging to the last input. - * - * @return {@link CheckpointCommittableManager} - */ - public Optional> getEndOfInputCommittable() { - return Optional.ofNullable(checkpointCommittables.get(EOI)); - } - /** * Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are * either committed or failed. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java index 641a651e2e406..24f9422d30b72 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java @@ -32,7 +32,6 @@ import java.util.Collection; import java.util.List; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class GlobalCommitterOperatorTest { @@ -138,38 +137,6 @@ void testStateRestore() throws Exception { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception { - final MockCommitter committer = new MockCommitter(); - final OneInputStreamOperatorTestHarness, Void> testHarness = - createTestHarness(committer, commitOnInput); - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage first = new CommittableWithLineage<>(1, EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage second = new CommittableWithLineage<>(2, EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - // commitOnInput implies that the global committer is not using notifyCheckpointComplete - if (commitOnInput) { - assertThat(committer.committed).containsExactly(1, 2); - } else { - assertThat(committer.committed).isEmpty(); - testHarness.notifyOfCompletedCheckpoint(EOI); - assertThat(committer.committed).containsExactly(1, 2); - } - - assertThat(testHarness.getOutput()).isEmpty(); - } - private OneInputStreamOperatorTestHarness, Void> createTestHarness( Committer committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index 756ea0c8022f0..c8b37943846af 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -35,7 +35,6 @@ import java.util.function.IntSupplier; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -126,45 +125,6 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { testHarness.close(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2) - .hasPendingCommittables(0); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - @Test void testStateRestore() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index 46914441c139d..57c03df322031 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -66,7 +66,6 @@ import java.util.stream.Collectors; import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -178,7 +177,7 @@ void testEmitOnEndOfInputInBatchMode() throws Exception { testHarness.processElement(1, 1); testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); } @ParameterizedTest @@ -467,6 +466,43 @@ public SinkWriter createWriter(InitContext context) { testHarness.close(); } + @Test + void testDoubleEndOfInput() throws Exception { + InspectableSink sink = sinkWithCommitter(); + + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink()))) { + testHarness.open(); + testHarness.processElement(1, 1); + + testHarness.endInput(); + testHarness.prepareSnapshotPreBarrier(1); + snapshot = testHarness.snapshot(1, 1); + + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); + } + + final InspectableSink restoredSink = sinkWithCommitter(); + try (OneInputStreamOperatorTestHarness> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink()))) { + restoredTestHarness.setRestoredCheckpointId(1L); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + restoredTestHarness.processElement(2, 2); + + restoredTestHarness.endInput(); + restoredTestHarness.prepareSnapshotPreBarrier(3); + restoredTestHarness.snapshot(3, 1); + + // asserts the guessed checkpoint id which needs + assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L); + } + } + private static void assertContextsEqual( Sink.InitContext initContext, WriterInitContext original) { assertThat(initContext.getUserCodeClassLoader().asClassLoader()) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 6e55adcc0c572..3181c21361aa8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -24,9 +24,6 @@ import org.junit.jupiter.api.Test; -import java.util.Optional; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class CommittableCollectorTest { @@ -44,22 +41,5 @@ void testGetCheckpointCommittablesUpTo() { committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0, 0)); assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2); - - assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent(); - } - - @Test - void testGetEndOfInputCommittable() { - final CommittableCollector committableCollector = - new CommittableCollector<>(METRIC_GROUP); - CommittableSummary first = new CommittableSummary<>(1, 1, EOI, 1, 0, 0); - committableCollector.addMessage(first); - - Optional> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - assertThat(endOfInputCommittable).isPresent(); - assertThat(endOfInputCommittable) - .get() - .returns(EOI, CheckpointCommittableManager::getCheckpointId); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index d689f009842e2..f484f0e3ebe69 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -182,6 +182,8 @@ public InternalTimeServiceManager create( private volatile boolean wasFailedExternally = false; + private long restoredCheckpointId = 0; + public AbstractStreamOperatorTestHarness( StreamOperator operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception { @@ -402,6 +404,10 @@ public StreamConfig getStreamConfig() { return config; } + public void setRestoredCheckpointId(long restoredCheckpointId) { + this.restoredCheckpointId = restoredCheckpointId; + } + /** Get all the output from the task. This contains StreamRecords and Events interleaved. */ public ConcurrentLinkedQueue getOutput() { return outputList; @@ -610,16 +616,16 @@ public void initializeState( jmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), jmOperatorStateHandles); - taskStateManager.setReportedCheckpointId(0); + taskStateManager.setReportedCheckpointId(restoredCheckpointId); taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, jmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, jmTaskStateSnapshot)); if (tmOperatorStateHandles != null) { TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); tmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), tmOperatorStateHandles); taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, tmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, tmTaskStateSnapshot)); } }