diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java index fa4e2b3d54b41..ae2ffe7793aef 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -107,6 +108,8 @@ public class CompactorOperator // submitted again while restoring private ListState>> remainingRequestsState; + private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; + public CompactorOperator( StreamOperatorParameters> parameters, FileCompactStrategy strategy, @@ -139,15 +142,16 @@ public void processElement(StreamRecord element) throws Except @Override public void endInput() throws Exception { // add collecting requests into the final snapshot - checkpointRequests.put(CommittableMessage.EOI, collectingRequests); + long checkpointId = lastKnownCheckpointId + 1; + checkpointRequests.put(checkpointId, collectingRequests); collectingRequests = new ArrayList<>(); // submit all requests and wait until they are done - submitUntil(CommittableMessage.EOI); + submitUntil(checkpointId); assert checkpointRequests.isEmpty(); getAllTasksFuture().join(); - emitCompacted(CommittableMessage.EOI); + emitCompacted(checkpointId); assert compactingRequests.isEmpty(); } @@ -225,6 +229,8 @@ private void submitUntil(long checkpointId) { } private void emitCompacted(long checkpointId) throws Exception { + lastKnownCheckpointId = checkpointId; + List compacted = new ArrayList<>(); Iterator>>> iter = compactingRequests.iterator(); @@ -252,7 +258,6 @@ private void emitCompacted(long checkpointId) throws Exception { getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), checkpointId, compacted.size(), - compacted.size(), 0); output.collect(new StreamRecord<>(summary)); for (FileSinkCommittable c : compacted) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java index df77f65878069..e0978e0ffcf50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java @@ -26,7 +26,10 @@ public interface CommittableMessage { /** * Special value for checkpointId for the end of input in case of batch commit or final * checkpoint. + * + * @deprecated the special value is not used anymore at all (remove with Flink 2.2) */ + @Deprecated(forRemoval = true) long EOI = Long.MAX_VALUE; /** The subtask that created this committable. */ @@ -35,6 +38,13 @@ public interface CommittableMessage { /** * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch * commit. + * + * @deprecated the special value EOI is not used anymore */ - long getCheckpointIdOrEOI(); + @Deprecated(forRemoval = true) + default long getCheckpointIdOrEOI() { + return getCheckpointId(); + } + + long getCheckpointId(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java index 252b10fadf40a..7496013b04661 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java @@ -42,7 +42,7 @@ public class CommittableSummary implements CommittableMessage { /** The number of committables coming from the given subtask in the particular checkpoint. */ private final int numberOfCommittables; - @Deprecated + @Deprecated(forRemoval = true) /** The number of committables that have not been successfully committed. */ private final int numberOfPendingCommittables; @@ -88,7 +88,7 @@ public int getNumberOfSubtasks() { return numberOfSubtasks; } - public long getCheckpointIdOrEOI() { + public long getCheckpointId() { return checkpointId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java index 819b4fdfc4fe7..6641a352885e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java @@ -50,7 +50,7 @@ public int getSubtaskId() { return subtaskId; } - public long getCheckpointIdOrEOI() { + public long getCheckpointId() { return checkpointId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java index 5a745238d69ee..cd02789219324 100755 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java @@ -35,6 +35,12 @@ public interface BoundedOneInput { /** * It is notified that no more data will arrive from the input. * + *

Stateful operators need to be aware that a restart with rescaling may occur after + * receiving this notification. A changed source split assignment may imply that the same + * subtask of this operator that received endInput, has its state after endInput snapshotted, + * and will receive new data after restart. Hence, the state should not contain any finalization + * that would make it impossible to process new data. + * *

WARNING: It is not safe to use this method to commit any transactions or other side * effects! You can use this method to flush any buffered data that can later on be committed * e.g. in a {@link StreamOperator#notifyCheckpointComplete(long)}. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 9cd85c4001abc..2f1cffcbcce00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -51,7 +52,6 @@ import java.util.Collections; import java.util.OptionalLong; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,11 +76,9 @@ class CommitterOperator extends AbstractStreamOperator committer; private CommittableCollector committableCollector; - private long lastCompletedCheckpointId = -1; + private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; private int maxRetries; - private boolean endInput = false; - /** The operator's state descriptor. */ private static final ListStateDescriptor STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>( @@ -134,11 +132,11 @@ public void initializeState(StateInitializationContext context) throws Exception getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), metricGroup)); - if (context.isRestored()) { + if (checkpointId.isPresent()) { committableCollectorState.get().forEach(cc -> committableCollector.merge(cc)); lastCompletedCheckpointId = checkpointId.getAsLong(); // try to re-commit recovered transactions as quickly as possible - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId); } } @@ -151,24 +149,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void endInput() throws Exception { - endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId)); } - private void commitAndEmitCheckpoints() throws IOException, InterruptedException { - long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; + private void commitAndEmitCheckpoints(long checkpointId) + throws IOException, InterruptedException { + lastCompletedCheckpointId = checkpointId; for (CheckpointCommittableManager checkpointManager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { // ensure that all committables of the first checkpoint are fully committed before // attempting the next committable commitAndEmit(checkpointManager); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index f5cfbd2aa6a3f..712d6541eec83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -54,8 +53,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.shaded.guava33.com.google.common.collect.Lists; - import javax.annotation.Nullable; import java.io.IOException; @@ -64,6 +61,7 @@ import java.util.List; import java.util.OptionalLong; +import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -93,13 +91,6 @@ class SinkWriterOperator extends AbstractStreamOperator committableSerializer; private final List legacyCommittables = new ArrayList<>(); - /** - * Used to remember that EOI has already happened so that we don't emit the last committables of - * the final checkpoints twice. - */ - private static final ListStateDescriptor END_OF_INPUT_STATE_DESC = - new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); - /** The runtime information of the input element. */ private final Context context; @@ -118,10 +109,7 @@ class SinkWriterOperator extends AbstractStreamOperator endOfInputState; + private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1; SinkWriterOperator( StreamOperatorParameters> parameters, @@ -164,8 +152,10 @@ protected void setup( @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId()); - if (context.isRestored()) { + OptionalLong restoredCheckpointId = context.getRestoredCheckpointId(); + WriterInitContext initContext = createInitContext(restoredCheckpointId); + if (restoredCheckpointId.isPresent()) { + lastKnownCheckpointId = restoredCheckpointId.getAsLong(); if (committableSerializer != null) { final ListState> legacyCommitterState = new SimpleVersionedListState<>( @@ -179,41 +169,12 @@ public void initializeState(StateInitializationContext context) throws Exception } sinkWriter = writerStateHandler.createWriter(initContext, context); - - if (emitDownstream) { - // Figure out if we have seen end of input before and if we can suppress creating - // transactions and sending them downstream to the CommitterOperator. We have the - // following - // cases: - // 1. state is empty: - // - First time initialization - // - Restoring from a previous version of Flink that didn't handle EOI - // - Upscaled from a final or regular checkpoint - // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries - // that the CommitterOperator needs to handle. - // 2. state is not empty: - // - This implies Flink restores from a version that handles EOI. - // - If there is one entry, no rescaling happened (for this subtask), so if it's true, - // we recover from a final checkpoint (for this subtask) and can ignore another EOI - // else we have a regular checkpoint. - // - If there are multiple entries, Flink downscaled, and we need to check if all are - // true and do the same as above. As soon as one entry is false, we regularly start - // the writer and potentially emit duplicate summaries if we indeed recovered from a - // final checkpoint. - endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); - ArrayList previousState = Lists.newArrayList(endOfInputState.get()); - endOfInput = !previousState.isEmpty() && !previousState.contains(false); - } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); - if (endOfInputState != null) { - endOfInputState.clear(); - endOfInputState.add(this.endOfInput); - } } @Override @@ -243,17 +204,16 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void endInput() throws Exception { + LOG.info("Received endInput"); if (!endOfInput) { endOfInput = true; - if (endOfInputState != null) { - endOfInputState.add(true); - } sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + emitCommittables(lastKnownCheckpointId + 1); } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { + lastKnownCheckpointId = checkpointId; if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 6aa7401a00a42..2118874d77741 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -95,6 +95,7 @@ void addSummary(CommittableSummary summary) { summary.getSubtaskId(), checkpointId, metricGroup); + // Remove branch once CommittableMessage.EOI has been removed (earliest 2.2) if (checkpointId == CommittableMessage.EOI) { SubtaskCommittableManager merged = subtasksCommittableManagers.merge( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index be832152ee76d..96585a632d107 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -33,7 +33,6 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Objects; -import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; @@ -49,8 +48,6 @@ */ @Internal public class CommittableCollector { - private static final long EOI = Long.MAX_VALUE; - /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */ private final NavigableMap> checkpointCommittables; @@ -144,15 +141,6 @@ public Collection> getCheckpointCo return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values()); } - /** - * Returns {@link CheckpointCommittableManager} belonging to the last input. - * - * @return {@link CheckpointCommittableManager} - */ - public Optional> getEndOfInputCommittable() { - return Optional.ofNullable(checkpointCommittables.get(EOI)); - } - /** * Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are * either committed or failed. diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java index a73dcc24d012c..bf8da806b7bca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java @@ -35,7 +35,6 @@ import java.util.Collection; import java.util.List; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class GlobalCommitterOperatorTest { @@ -140,38 +139,6 @@ void testStateRestore() throws Exception { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception { - final MockCommitter committer = new MockCommitter(); - final OneInputStreamOperatorTestHarness, Void> testHarness = - createTestHarness(committer, commitOnInput); - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage first = new CommittableWithLineage<>(1, EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage second = new CommittableWithLineage<>(2, EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - // commitOnInput implies that the global committer is not using notifyCheckpointComplete - if (commitOnInput) { - assertThat(committer.committed).containsExactly(1, 2); - } else { - assertThat(committer.committed).isEmpty(); - testHarness.notifyOfCompletedCheckpoint(EOI); - assertThat(committer.committed).containsExactly(1, 2); - } - - assertThat(testHarness.getOutput()).isEmpty(); - } - private OneInputStreamOperatorTestHarness, Void> createTestHarness( Committer committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 6d311170d4741..892b3785e255a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -24,9 +24,6 @@ import org.junit.jupiter.api.Test; -import java.util.Optional; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class CommittableCollectorTest { @@ -44,22 +41,5 @@ void testGetCheckpointCommittablesUpTo() { committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0)); assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2); - - assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent(); - } - - @Test - void testGetEndOfInputCommittable() { - final CommittableCollector committableCollector = - new CommittableCollector<>(METRIC_GROUP); - CommittableSummary first = new CommittableSummary<>(1, 1, EOI, 1, 0); - committableCollector.addMessage(first); - - Optional> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - assertThat(endOfInputCommittable).isPresent(); - assertThat(endOfInputCommittable) - .get() - .returns(EOI, CheckpointCommittableManager::getCheckpointId); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index e600f66620424..561ed1fc4dda7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -185,6 +185,8 @@ public InternalTimeServiceManager create( private volatile boolean wasFailedExternally = false; + private long restoredCheckpointId = 0; + public AbstractStreamOperatorTestHarness( StreamOperator operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception { @@ -397,6 +399,10 @@ public StreamConfig getStreamConfig() { return config; } + public void setRestoredCheckpointId(long restoredCheckpointId) { + this.restoredCheckpointId = restoredCheckpointId; + } + /** Get all the output from the task. This contains StreamRecords and Events interleaved. */ public ConcurrentLinkedQueue getOutput() { return outputList; @@ -614,16 +620,16 @@ public void initializeState( jmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), jmOperatorStateHandles); - taskStateManager.setReportedCheckpointId(0); + taskStateManager.setReportedCheckpointId(restoredCheckpointId); taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, jmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, jmTaskStateSnapshot)); if (tmOperatorStateHandles != null) { TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); tmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), tmOperatorStateHandles); taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, tmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, tmTaskStateSnapshot)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java index 9e3753031c0bf..8912281a08042 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java @@ -66,12 +66,17 @@ Sink simpleSink() { } Sink sinkWithCommitter() { - return TestSinkV2.newBuilder().setDefaultCommitter().build(); + return TestSinkV2.newBuilder() + .setCommitter(new TestSinkV2.DefaultCommitter<>(), TestSinkV2.RecordSerializer::new) + .build(); } Sink sinkWithCommitterAndGlobalCommitter() { - return TestSinkV2.newBuilder() - .setDefaultCommitter() + return ((TestSinkV2.Builder>) + TestSinkV2.newBuilder() + .setCommitter( + new TestSinkV2.DefaultCommitter<>(), + TestSinkV2.RecordSerializer::new)) .setWithPostCommitTopology(true) .build(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java deleted file mode 100644 index 5fdb36a3953f8..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.connector.sink2.SupportsCommitter; -import org.apache.flink.configuration.SinkOptions; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; - -import org.assertj.core.api.AbstractThrowableAssert; -import org.assertj.core.api.ListAssert; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.function.IntSupplier; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; - -abstract class CommitterOperatorTestBase { - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitCommittables(boolean withPostCommitTopology) throws Exception { - SinkAndCounters sinkAndCounters; - if (withPostCommitTopology) { - // Insert global committer to simulate post commit topology - sinkAndCounters = sinkWithPostCommit(); - } else { - sinkAndCounters = sinkWithoutPostCommit(); - } - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = - new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); - - // Trigger commit - testHarness.notifyOfCompletedCheckpoint(1); - - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); - if (withPostCommitTopology) { - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - records.element(0, as(committableSummary())) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); - } else { - assertThat(testHarness.getOutput()).isEmpty(); - } - testHarness.close(); - } - - @Test - void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = createTestHarness(sinkAndCounters.sink, false, true); - testHarness.open(); - testHarness.setProcessingTime(0); - - // Only send first committable - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 2, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage first = new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(first)); - - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) - .hasMessageContaining("Trying to commit incomplete batch of committables"); - - assertThat(testHarness.getOutput()).isEmpty(); - assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); - - final CommittableWithLineage second = new CommittableWithLineage<>("2", 1L, 1); - testHarness.processElement(new StreamRecord<>(second)); - - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); - - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - - @Test - void testStateRestore() throws Exception { - - final int originalSubtaskId = 0; - final int subtaskIdAfterRecovery = 9; - - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = - createTestHarness( - sinkWithPostCommitWithRetry().sink, - false, - true, - 1, - 1, - originalSubtaskId); - testHarness.open(); - - // We cannot test a different checkpoint thant 0 because when using the OperatorTestHarness - // for recovery the lastCompleted checkpoint is always reset to 0. - long checkpointId = 0L; - - final CommittableSummary committableSummary = - new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage first = - new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); - testHarness.processElement(new StreamRecord<>(first)); - - // another committable for the same checkpointId but from different subtask. - final CommittableSummary committableSummary2 = - new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - final CommittableWithLineage second = - new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); - testHarness.processElement(new StreamRecord<>(second)); - - final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); - assertThat(testHarness.getOutput()).isEmpty(); - testHarness.close(); - - // create new testHarness but with different parallelism level and subtaskId that original - // one. - // we will make sure that new subtaskId was used during committable recovery. - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - restored = - createTestHarness( - sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); - - restored.initializeState(snapshot); - restored.open(); - - // Previous committables are immediately committed if possible - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); - ListAssert> records = - assertThat(restored.extractOutputValues()).hasSize(3); - CommittableSummaryAssert objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasCheckpointId(checkpointId) - .hasFailedCommittables(0) - .hasSubtaskId(subtaskIdAfterRecovery); - objectCommittableSummaryAssert.hasOverallCommittables(2); - - // Expect the same checkpointId that the original snapshot was made with. - records.element(1, as(committableWithLineage())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasCommittable(first.getCommittable()); - records.element(2, as(committableWithLineage())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasCommittable(second.getCommittable()); - restored.close(); - } - - @ParameterizedTest - @ValueSource(ints = {0, 1}) - void testNumberOfRetries(int numRetries) throws Exception { - try (OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = - createTestHarness( - sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { - testHarness - .getStreamConfig() - .getConfiguration() - .set(SinkOptions.COMMITTER_RETRIES, numRetries); - testHarness.open(); - - long ckdId = 1L; - testHarness.processElement( - new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); - testHarness.processElement( - new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); - AbstractThrowableAssert throwableAssert = - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); - if (numRetries == 0) { - throwableAssert.hasMessageContaining("Failed to commit 1 committables"); - } else { - throwableAssert.doesNotThrowAnyException(); - } - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - - try (OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = - new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>( - sinkAndCounters.sink, false, isCheckpointingEnabled))) { - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); - - testHarness.endInput(); - - // If checkpointing enabled endInput does not emit anything because a final checkpoint - // follows - if (isCheckpointingEnabled) { - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - CommittableSummaryAssert objectCommittableSummaryAssert = - records.element(0, as(committableSummary())).hasCheckpointId(1L); - objectCommittableSummaryAssert.hasOverallCommittables(1); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); - - // Future emission calls should change the output - testHarness.notifyOfCompletedCheckpoint(2); - testHarness.endInput(); - - assertThat(testHarness.getOutput()).hasSize(2); - } - } - - private OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - createTestHarness( - SupportsCommitter sink, - boolean isBatchMode, - boolean isCheckpointingEnabled) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); - } - - private OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - createTestHarness( - SupportsCommitter sink, - boolean isBatchMode, - boolean isCheckpointingEnabled, - int maxParallelism, - int parallelism, - int subtaskId) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled), - maxParallelism, - parallelism, - subtaskId); - } - - abstract SinkAndCounters sinkWithPostCommit(); - - abstract SinkAndCounters sinkWithPostCommitWithRetry(); - - abstract SinkAndCounters sinkWithoutPostCommit(); - - static class SinkAndCounters { - SupportsCommitter sink; - IntSupplier commitCounter; - - public SinkAndCounters(SupportsCommitter sink, IntSupplier commitCounter) { - this.sink = sink; - this.commitCounter = commitCounter; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 0112b5cf8625e..94a22fb68bfa4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -18,55 +18,350 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.configuration.SinkOptions; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.SerializableSupplier; + +import org.assertj.core.api.AbstractThrowableAssert; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; +import java.util.function.IntSupplier; + +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +class SinkV2CommitterOperatorTest { + + public static final SerializableSupplier> STRING_SERIALIZER = + () -> new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); -class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { - @Override SinkAndCounters sinkWithPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); + ForwardingCommitter committer = new ForwardingCommitter<>(); return new SinkAndCounters( - (SupportsCommitter) - TestSinkV2.newBuilder() - .setCommitter(committer) - .setWithPostCommitTopology(true) - .build(), + TestSinkV2.newBuilder() + .setWriter(new TestSinkV2.ForwardCommittingSinkWriter()) + .setCommitter(committer, STRING_SERIALIZER) + .setWithPostCommitTopology(true) + .build(), () -> committer.successfulCommits); } - @Override SinkAndCounters sinkWithPostCommitWithRetry() { - return new CommitterOperatorTestBase.SinkAndCounters( - (SupportsCommitter) - TestSinkV2.newBuilder() - .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setWithPostCommitTopology(true) - .build(), + return new SinkAndCounters( + TestSinkV2.newBuilder() + .setWriter(new TestSinkV2.ForwardCommittingSinkWriter()) + .setCommitter(new TestSinkV2.RetryOnceCommitter<>(), STRING_SERIALIZER) + .setWithPostCommitTopology(true) + .build(), () -> 0); } - @Override SinkAndCounters sinkWithoutPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); + ForwardingCommitter committer = new ForwardingCommitter<>(); return new SinkAndCounters( - TestSinkV2.newBuilder() - .setCommitter(committer) + TestSinkV2.newBuilder() + .setWriter(new TestSinkV2.ForwardCommittingSinkWriter()) + .setCommitter(committer, STRING_SERIALIZER) .setWithPostCommitTopology(false) - .build() - .asSupportsCommitter(), + .build(), () -> committer.successfulCommits); } - private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmitCommittables(boolean withPostCommitTopology) throws Exception { + SinkAndCounters sinkAndCounters; + if (withPostCommitTopology) { + // Insert global committer to simulate post commit topology + sinkAndCounters = sinkWithPostCommit(); + } else { + sinkAndCounters = sinkWithoutPostCommit(); + } + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); + testHarness.open(); + + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + // Trigger commit + testHarness.notifyOfCompletedCheckpoint(1); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + if (withPostCommitTopology) { + ListAssert> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + records.element(0, as(committableSummary())) + .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); + } else { + assertThat(testHarness.getOutput()).isEmpty(); + } + testHarness.close(); + } + + @Test + void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = createTestHarness(sinkAndCounters.sink, false, true); + testHarness.open(); + testHarness.setProcessingTime(0); + + // Only send first committable + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, 1L, 2, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage first = new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(first)); + + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); + + assertThat(testHarness.getOutput()).isEmpty(); + assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); + + final CommittableWithLineage second = new CommittableWithLineage<>("2", 1L, 1); + testHarness.processElement(new StreamRecord<>(second)); + + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); + ListAssert> records = + assertThat(testHarness.extractOutputValues()).hasSize(3); + records.element(0, as(committableSummary())) + .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); + records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); + records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); + testHarness.close(); + } + + @Test + void testStateRestore() throws Exception { + + final int originalSubtaskId = 0; + final int subtaskIdAfterRecovery = 9; + + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, + false, + true, + 1, + 1, + originalSubtaskId); + testHarness.open(); + + // We cannot test a different checkpoint thant 0 because when using the OperatorTestHarness + // for recovery the lastCompleted checkpoint is always reset to 0. + long checkpointId = 0L; + + final CommittableSummary committableSummary = + new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage first = + new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); + testHarness.processElement(new StreamRecord<>(first)); + + // another committable for the same checkpointId but from different subtask. + final CommittableSummary committableSummary2 = + new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary2)); + final CommittableWithLineage second = + new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); + testHarness.processElement(new StreamRecord<>(second)); + + final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); + assertThat(testHarness.getOutput()).isEmpty(); + testHarness.close(); + + // create new testHarness but with different parallelism level and subtaskId that original + // one. + // we will make sure that new subtaskId was used during committable recovery. + SinkAndCounters restored = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + restoredHarness = + createTestHarness( + restored.sink, false, true, 10, 10, subtaskIdAfterRecovery); + + restoredHarness.initializeState(snapshot); + restoredHarness.open(); + + // Previous committables are immediately committed if possible + assertThat(restored.commitCounter.getAsInt()).isEqualTo(2); + ListAssert> records = + assertThat(restoredHarness.extractOutputValues()).hasSize(3); + CommittableSummaryAssert objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasCheckpointId(checkpointId) + .hasFailedCommittables(0) + .hasSubtaskId(subtaskIdAfterRecovery); + objectCommittableSummaryAssert.hasOverallCommittables(2); + + // Expect the same checkpointId that the original snapshot was made with. + records.element(1, as(committableWithLineage())) + .hasCheckpointId(checkpointId) + .hasSubtaskId(subtaskIdAfterRecovery) + .hasCommittable(first.getCommittable()); + records.element(2, as(committableWithLineage())) + .hasCheckpointId(checkpointId) + .hasSubtaskId(subtaskIdAfterRecovery) + .hasCommittable(second.getCommittable()); + restoredHarness.close(); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1}) + void testNumberOfRetries(int numRetries) throws Exception { + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { + testHarness + .getStreamConfig() + .getConfiguration() + .set(SinkOptions.COMMITTER_RETRIES, numRetries); + testHarness.open(); + + long ckdId = 1L; + testHarness.processElement( + new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); + testHarness.processElement( + new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); + AbstractThrowableAssert throwableAssert = + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); + if (numRetries == 0) { + throwableAssert.hasMessageContaining("Failed to commit 1 committables"); + } else { + throwableAssert.doesNotThrowAnyException(); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { + final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>( + sinkAndCounters.sink, false, isCheckpointingEnabled))) { + testHarness.open(); + + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + // If checkpointing enabled endInput does not emit anything because a final checkpoint + // follows + if (isCheckpointingEnabled) { + testHarness.notifyOfCompletedCheckpoint(1); + } + + ListAssert> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + CommittableSummaryAssert objectCommittableSummaryAssert = + records.element(0, as(committableSummary())).hasCheckpointId(1L); + objectCommittableSummaryAssert.hasOverallCommittables(1); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); + + // Future emission calls should change the output + testHarness.notifyOfCompletedCheckpoint(2); + testHarness.endInput(); + + assertThat(testHarness.getOutput()).hasSize(2); + } + } + + private OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + createTestHarness( + SupportsCommitter sink, + boolean isBatchMode, + boolean isCheckpointingEnabled) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); + } + + private OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + createTestHarness( + SupportsCommitter sink, + boolean isBatchMode, + boolean isCheckpointingEnabled, + int maxParallelism, + int parallelism, + int subtaskId) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled), + maxParallelism, + parallelism, + subtaskId); + } + + private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { private int successfulCommits = 0; @Override - public void commit(Collection> committables) { + public void commit(Collection> committables) { successfulCommits += committables.size(); } @Override public void close() throws Exception {} } + + static class SinkAndCounters { + SupportsCommitter sink; + IntSupplier commitCounter; + + @SuppressWarnings("unchecked") + public SinkAndCounters(TestSinkV2 sink, IntSupplier commitCounter) { + this.sink = (SupportsCommitter) sink; + this.commitCounter = commitCounter; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index ae0616e8b97ab..ba3f80d66fa9e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -18,75 +18,517 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommitter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommittingSinkWriter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultSinkWriter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultStatefulSinkWriter; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer; +import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - InspectableSink sinkWithoutCommitter() { - TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).build()); - } - - @Override - InspectableSink sinkWithCommitter() { - TestSinkV2.DefaultSinkWriter sinkWriter = - new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new InspectableSink( - TestSinkV2.newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithTimeBasedWriter() { - TestSinkV2.DefaultSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter(); - return new InspectableSink( - TestSinkV2.newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithState(boolean withState, String stateName) { - TestSinkV2.DefaultSinkWriter sinkWriter = - new TestSinkV2.DefaultStatefulSinkWriter<>(); - TestSinkV2.Builder builder = - TestSinkV2.newBuilder() - .setDefaultCommitter() +import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; + +class SinkV2SinkWriterOperatorTest { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLoadPreviousSinkState(boolean stateful) throws Exception { + // 1. Build previous sink state + final List previousSinkInputs = + Arrays.asList( + "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", + "cost", "prompt"); + + DefaultStatefulSinkWriter writer = new DefaultStatefulSinkWriter<>(); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(writer) + .setWriterState(stateful) + .setCompatibleStateNames(CompatibleStateSinkOperator.SINK_STATE_NAME) + .build(); + int expectedState = 5; + OperatorSubtaskState previousSinkState; + try (OneInputStreamOperatorTestHarness previousSink = + new OneInputStreamOperatorTestHarness<>( + new CompatibleStateSinkOperator<>( + TestSinkV2.WRITER_SERIALIZER, expectedState), + StringSerializer.INSTANCE)) { + + previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); + } + + // 2. Load previous sink state and verify state + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness> + compatibleWriterOperator = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink))) { + + // load the state from previous sink + compatibleWriterOperator.initializeState(previousSinkState); + assertThat(writer.getRecordCount()).isEqualTo(stateful ? expectedState : 0); + + // 3. do another snapshot and check if this also can be restored without compabitible + // state + // name + compatibleWriterOperator.prepareSnapshotPreBarrier(1L); + snapshot = compatibleWriterOperator.snapshot(1L, 1L); + } + + // 4. Restore the sink without previous sink's state + DefaultStatefulSinkWriter restoredWriter = new DefaultStatefulSinkWriter<>(); + TestSinkV2 restoredSink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(restoredWriter) + .setWriterState(stateful) + .setCompatibleStateNames(CompatibleStateSinkOperator.SINK_STATE_NAME) + .build(); + try (OneInputStreamOperatorTestHarness> + restoredSinkOperator = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink))) { + + restoredSinkOperator.initializeState(snapshot); + assertThat(restoredWriter.getRecordCount()).isEqualTo(stateful ? expectedState : 0); + } + } + + @Test + void testNotEmitCommittablesWithoutCommitter() throws Exception { + DefaultSinkWriter writer = new DefaultSinkWriter<>(); + TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).build(); + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + assertThat(writer.getRecordsOfCurrentCheckpoint()) + .containsOnly(new Record<>(1, 1L, Long.MIN_VALUE)); + + testHarness.prepareSnapshotPreBarrier(1); + assertThat(testHarness.extractOutputValues()).isEmpty(); + // Elements are flushed + assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty(); + } + } + + @Test + void testWatermarkPropagatedToSinkWriter() throws Exception { + final long initialTime = 0; + + DefaultSinkWriter writer = new DefaultSinkWriter<>(); + TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).build(); + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processWatermark(initialTime + 1); + + assertThat(testHarness.getOutput()) + .containsExactly( + new org.apache.flink.streaming.api.watermark.Watermark(initialTime), + new org.apache.flink.streaming.api.watermark.Watermark( + initialTime + 1)); + assertThat(writer.getWatermarks()) + .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); + } + } + + @Test + void testTimeBasedBufferingSinkWriter() throws Exception { + final long initialTime = 0; + + DefaultSinkWriter writer = new TimeBasedBufferingSinkWriter(); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + + // Expect empty committableSummary + assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); + + testHarness.getProcessingTimeService().setCurrentTime(2001); + + testHarness.prepareSnapshotPreBarrier(2L); + + assertBasicOutput( + testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), + 2, + 2L); + } + } + + @Test + void testEmitOnFlushWithCommitter() throws Exception { + DefaultSinkWriter writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processElement(1, 1); + testHarness.processElement(2, 2); + + // flush + testHarness.prepareSnapshotPreBarrier(1); + + assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); + } + } + + @Test + void testEmitOnEndOfInputInBatchMode() throws Exception { + DefaultSinkWriter writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + final SinkWriterOperatorFactory writerOperatorFactory = + new SinkWriterOperatorFactory<>(sink); + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(writerOperatorFactory)) { + + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processElement(1, 1); + testHarness.endInput(); + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStateRestore(boolean stateful) throws Exception { + + final long initialTime = 0; + + DefaultStatefulSinkWriter writer = new DefaultStatefulSinkWriter<>(); + final TestSinkV2 sink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWithPostCommitTopology(true) + .setWriter(writer) + .setWriterState(stateful) + .build(); + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + snapshot = testHarness.snapshot(1L, 1L); + + assertThat(writer.getRecordCount()).isEqualTo(2); + assertThat(writer.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); + } + + DefaultStatefulSinkWriter restoredWriter = new DefaultStatefulSinkWriter<>(); + final TestSinkV2 restoredSink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) .setWithPostCommitTopology(true) - .setWriter(sinkWriter); - if (withState) { - builder.setWriterState(true); + .setWriter(restoredWriter) + .setWriterState(stateful) + .build(); + try (OneInputStreamOperatorTestHarness> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink))) { + + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + + // check that the previous state is correctly restored + assertThat(restoredWriter.getRecordCount()).isEqualTo(stateful ? 2 : 0); } - if (stateName != null) { - builder.setCompatibleStateNames(stateName); + } + + @Test + void testRestoreCommitterState() throws Exception { + final List> committables = + Arrays.asList(new Record<>(1, 1L, 1), new Record<>(2, 2L, 2)); + + DefaultSinkWriter writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + final OperatorSubtaskState committerState; + try (OneInputStreamOperatorTestHarness, Record> committer = + new OneInputStreamOperatorTestHarness<>( + new TestCommitterOperator(new RecordSerializer<>()))) { + + committerState = TestHarnessUtil.buildSubtaskState(committer, committables); } - return new InspectableSink(builder.build()); + + final ListAssert> records; + try (OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink))) { + + testHarness.initializeState(committerState); + + testHarness.open(); + + testHarness.prepareSnapshotPreBarrier(2); + + records = assertThat(testHarness.extractOutputValues()).hasSize(4); + } + + records.element(0, as(committableSummary())) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasOverallCommittables(committables.size()); + records.>>element( + 1, as(committableWithLineage())) + .hasCommittable(committables.get(0)) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasSubtaskId(0); + records.>>element( + 2, as(committableWithLineage())) + .hasCommittable(committables.get(1)) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasSubtaskId(0); + records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); } - private static class TimeBasedBufferingSinkWriter - extends TestSinkV2.DefaultCommittingSinkWriter + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { + DefaultSinkWriter writer = new DefaultCommittingSinkWriter<>(); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setWriter(writer) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .build(); + try (OneInputStreamOperatorTestHarness>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + Record record = new Record<>(1, 1L, Long.MIN_VALUE); + assertThat(writer.getRecordsOfCurrentCheckpoint()).containsOnly(record); + + testHarness.endInput(); + + if (isCheckpointingEnabled) { + testHarness.prepareSnapshotPreBarrier(1); + } + + List> committables = Collections.singletonList(record); + + ListAssert>> records = + assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); + records.element(0, as(committableSummary())) + .hasOverallCommittables(committables.size()); + + records.filteredOn(message -> message instanceof CommittableWithLineage) + .map( + message -> + ((CommittableWithLineage>) message) + .getCommittable()) + .containsExactlyInAnyOrderElementsOf(committables); + assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty(); + } + } + + @Test + void testDoubleEndOfInput() throws Exception { + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setWriter(new DefaultCommittingSinkWriter()) + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWriterState(true) + .build(); + + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink))) { + testHarness.open(); + testHarness.processElement(1, 1); + + testHarness.endInput(); + testHarness.prepareSnapshotPreBarrier(1); + snapshot = testHarness.snapshot(1, 1); + + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); + } + + final TestSinkV2 restoredSink = + TestSinkV2.newBuilder() + .setCommitter(new DefaultCommitter<>(), RecordSerializer::new) + .setWriter(new DefaultStatefulSinkWriter()) + .setWriterState(true) + .build(); + try (OneInputStreamOperatorTestHarness> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink))) { + restoredTestHarness.setRestoredCheckpointId(1L); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + restoredTestHarness.processElement(2, 2); + + restoredTestHarness.endInput(); + restoredTestHarness.prepareSnapshotPreBarrier(3); + restoredTestHarness.snapshot(3, 1); + + // asserts the guessed checkpoint id which needs + assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L); + } + } + + @Test + void testInitContext() throws Exception { + final AtomicReference initContext = new AtomicReference<>(); + final Sink sink = + context -> { + initContext.set(context); + return null; + }; + + final int subtaskId = 1; + final int parallelism = 10; + final TypeSerializer typeSerializer = StringSerializer.INSTANCE; + final JobID jobID = new JobID(); + + final MockEnvironment environment = + MockEnvironment.builder() + .setSubtaskIndex(subtaskId) + .setParallelism(parallelism) + .setMaxParallelism(parallelism) + .setJobID(jobID) + .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) + .build(); + + try (OneInputStreamOperatorTestHarness>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink), + typeSerializer, + environment)) { + testHarness.open(); + + assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); + assertThat(initContext.get().getMailboxExecutor()).isNotNull(); + assertThat(initContext.get().getProcessingTimeService()).isNotNull(); + assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()) + .isEqualTo(subtaskId); + assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) + .isEqualTo(parallelism); + assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); + assertThat(initContext.get().metricGroup()).isNotNull(); + assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); + assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); + assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); + assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); + } + } + + private static void assertBasicOutput( + List> output, + int numberOfCommittables, + long checkpointId) { + ListAssert> records = + assertThat(output).hasSize(numberOfCommittables + 1); + records.element(0, as(committableSummary())).hasOverallCommittables(numberOfCommittables); + records.filteredOn(r -> r instanceof CommittableWithLineage) + .allSatisfy( + cl -> + SinkV2Assertions.assertThat((CommittableWithLineage) cl) + .hasCheckpointId(checkpointId) + .hasSubtaskId(0)); + } + + private static class TimeBasedBufferingSinkWriter extends DefaultCommittingSinkWriter implements ProcessingTimeService.ProcessingTimeCallback { - private final List cachedCommittables = new ArrayList<>(); + private final List> cachedCommittables = new ArrayList<>(); private ProcessingTimeService processingTimeService; @Override public void write(Integer element, Context context) { cachedCommittables.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); + new Record<>(element, context.timestamp(), context.currentWatermark())); } @Override @@ -103,58 +545,95 @@ public void init(WriterInitContext context) { } } - private static class SnapshottingBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter { - public static final int NOT_SNAPSHOTTED = -1; - long lastCheckpointId = NOT_SNAPSHOTTED; - boolean endOfInput = false; + private static class TestCommitterOperator extends AbstractStreamOperator> + implements OneInputStreamOperator, Record> { + + private static final ListStateDescriptor STREAMING_COMMITTER_RAW_STATES_DESC = + new ListStateDescriptor<>( + "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); + private ListState>> committerState; + private final List> buffer = new ArrayList<>(); + private final SimpleVersionedSerializer> serializer; + + public TestCommitterOperator(SimpleVersionedSerializer> serializer) { + this.serializer = serializer; + } @Override - public void flush(boolean endOfInput) throws IOException, InterruptedException { - this.endOfInput = endOfInput; + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + committerState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), + new TestingCommittableSerializer(serializer)); } @Override - public List snapshotState(long checkpointId) throws IOException { - lastCheckpointId = checkpointId; - return super.snapshotState(checkpointId); + public void processElement(StreamRecord> element) { + buffer.add(element.getValue()); } @Override - public Collection prepareCommit() { - if (!endOfInput) { - return ImmutableList.of(); - } - List result = elements; - elements = new ArrayList<>(); - return result; + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + committerState.add(buffer); } } - static class InspectableSink extends AbstractInspectableSink> { - InspectableSink(TestSinkV2 sink) { - super(sink); + /** Writes state to test whether the sink can read from alternative state names. */ + private static class CompatibleStateSinkOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + static final String SINK_STATE_NAME = "compatible_sink_state"; + + static final ListStateDescriptor SINK_STATE_DESC = + new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); + ListState sinkState; + private final SimpleVersionedSerializer serializer; + private final T initialState; + + public CompatibleStateSinkOperator( + SimpleVersionedSerializer serializer, T initialState) { + this.serializer = serializer; + this.initialState = initialState; } - @Override - public long getLastCheckpointId() { - return getSink().getWriter().lastCheckpointId; + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + sinkState = + new SimpleVersionedListState<>( + context.getOperatorStateStore().getListState(SINK_STATE_DESC), + serializer); + if (!context.isRestored()) { + sinkState.add(initialState); + } } @Override - public List getRecordsOfCurrentCheckpoint() { - return getSink().getWriter().elements; + public void processElement(StreamRecord element) { + // do nothing } + } - @Override - public List getWatermarks() { - return getSink().getWriter().watermarks; + private static class TestingCommittableSerializer + extends SinkV1WriterCommittableSerializer> { + + private final SimpleVersionedSerializer> committableSerializer; + + public TestingCommittableSerializer( + SimpleVersionedSerializer> committableSerializer) { + super(committableSerializer); + this.committableSerializer = committableSerializer; } @Override - public int getRecordCountFromState() { - return ((TestSinkV2.DefaultStatefulSinkWriter) getSink().getWriter()) - .getRecordCount(); + public byte[] serialize(List> obj) throws IOException { + final DataOutputSerializer out = new DataOutputSerializer(256); + out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); + SimpleVersionedSerialization.writeVersionAndSerializeList( + committableSerializer, obj, out); + return out.getCopyOfBuffer(); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java deleted file mode 100644 index 6bde86216b9f9..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; -import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; - -import org.assertj.core.api.ListAssert; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; - -abstract class SinkWriterOperatorTestBase { - - @Test - void testNotEmitCommittablesWithoutCommitter() throws Exception { - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - assertThat(sink.getRecordsOfCurrentCheckpoint()) - .containsOnly("(1,1," + Long.MIN_VALUE + ")"); - - testHarness.prepareSnapshotPreBarrier(1); - assertThat(testHarness.extractOutputValues()).isEmpty(); - // Elements are flushed - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - testHarness.close(); - } - - @Test - void testWatermarkPropagatedToSinkWriter() throws Exception { - final long initialTime = 0; - - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processWatermark(initialTime + 1); - - assertThat(testHarness.getOutput()) - .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); - assertThat(sink.getWatermarks()) - .containsExactly( - new org.apache.flink.api.common.eventtime.Watermark(initialTime), - new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)); - testHarness.close(); - } - - @Test - void testTimeBasedBufferingSinkWriter() throws Exception { - final long initialTime = 0; - - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); - - testHarness.open(); - - testHarness.setProcessingTime(0L); - - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); - - testHarness.prepareSnapshotPreBarrier(1L); - - // Expect empty committableSummary - assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); - - testHarness.getProcessingTimeService().setCurrentTime(2001); - - testHarness.prepareSnapshotPreBarrier(2L); - - assertBasicOutput( - testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), - 2, - 2L); - testHarness.close(); - } - - @Test - void testEmitOnFlushWithCommitter() throws Exception { - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); - - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); - - testHarness.processElement(1, 1); - testHarness.processElement(2, 2); - - // flush - testHarness.prepareSnapshotPreBarrier(1); - - assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); - testHarness.close(); - } - - @Test - void testEmitOnEndOfInputInBatchMode() throws Exception { - final SinkWriterOperatorFactory writerOperatorFactory = - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); - - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); - - testHarness.processElement(1, 1); - testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testStateRestore(boolean stateful) throws Exception { - - final long initialTime = 0; - - final InspectableSink sink = sinkWithState(stateful, null); - Sink sink2 = sink.getSink(); - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); - - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); - - testHarness.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); - - assertThat(sink.getRecordCountFromState()).isEqualTo(2); - assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); - - testHarness.close(); - - final InspectableSink restoredSink = sinkWithState(stateful, null); - final OneInputStreamOperatorTestHarness> - restoredTestHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(restoredSink.getSink())); - - restoredTestHarness.initializeState(snapshot); - restoredTestHarness.open(); - - // check that the previous state is correctly restored - assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); - - restoredTestHarness.close(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testLoadPreviousSinkState(boolean stateful) throws Exception { - // 1. Build previous sink state - final List previousSinkInputs = - Arrays.asList( - "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", - "cost", "prompt"); - - InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - int expectedState = 5; - final OneInputStreamOperatorTestHarness previousSink = - new OneInputStreamOperatorTestHarness<>( - new CompatibleStateSinkOperator<>( - TestSinkV2.WRITER_SERIALIZER, expectedState), - StringSerializer.INSTANCE); - - OperatorSubtaskState previousSinkState = - TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); - - // 2. Load previous sink state and verify state - Sink sink3 = sink.getSink(); - final OneInputStreamOperatorTestHarness> - compatibleWriterOperator = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink3)); - - // load the state from previous sink - compatibleWriterOperator.initializeState(previousSinkState); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - // 3. do another snapshot and check if this also can be restored without compabitible state - // name - compatibleWriterOperator.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); - - compatibleWriterOperator.close(); - - // 4. Restore the sink without previous sink's state - InspectableSink sink2 = - sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - final OneInputStreamOperatorTestHarness> - restoredSinkOperator = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink2.getSink())); - - restoredSinkOperator.initializeState(snapshot); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - restoredSinkOperator.close(); - } - - @Test - void testRestoreCommitterState() throws Exception { - final List committables = Arrays.asList("state1", "state2"); - - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness committer = - new OneInputStreamOperatorTestHarness<>( - new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), - StringSerializer.INSTANCE); - - final OperatorSubtaskState committerState = - TestHarnessUtil.buildSubtaskState(committer, committables); - - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - - testHarness.initializeState(committerState); - - testHarness.open(); - - testHarness.prepareSnapshotPreBarrier(2); - - final ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(4); - - records.element(0, as(committableSummary())) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasOverallCommittables(committables.size()); - records.>element(1, as(committableWithLineage())) - .hasCommittable(committables.get(0)) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasSubtaskId(0); - records.>element(2, as(committableWithLineage())) - .hasCommittable(committables.get(1)) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasSubtaskId(0); - records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - final String record = "(1,1," + Long.MIN_VALUE + ")"; - assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); - - testHarness.endInput(); - - if (isCheckpointingEnabled) { - testHarness.prepareSnapshotPreBarrier(1); - } - - List committables = Collections.singletonList(record); - - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); - records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); - - records.filteredOn(message -> message instanceof CommittableWithLineage) - .map(message -> ((CommittableWithLineage) message).getCommittable()) - .containsExactlyInAnyOrderElementsOf(committables); - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - - testHarness.close(); - } - - @Test - void testInitContext() throws Exception { - final AtomicReference initContext = - new AtomicReference<>(); - final org.apache.flink.api.connector.sink2.Sink sink = - context -> { - initContext.set(context); - return null; - }; - - final int subtaskId = 1; - final int parallelism = 10; - final TypeSerializer typeSerializer = StringSerializer.INSTANCE; - final JobID jobID = new JobID(); - - final MockEnvironment environment = - MockEnvironment.builder() - .setSubtaskIndex(subtaskId) - .setParallelism(parallelism) - .setMaxParallelism(parallelism) - .setJobID(jobID) - .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) - .build(); - - final OneInputStreamOperatorTestHarness> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink), typeSerializer, environment); - testHarness.open(); - - assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); - assertThat(initContext.get().getMailboxExecutor()).isNotNull(); - assertThat(initContext.get().getProcessingTimeService()).isNotNull(); - assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId); - assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(parallelism); - assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); - assertThat(initContext.get().metricGroup()).isNotNull(); - assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); - assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); - assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); - assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); - - testHarness.close(); - } - - private static void assertContextsEqual( - WriterInitContext initContext, WriterInitContext original) { - assertThat(initContext.getUserCodeClassLoader().asClassLoader()) - .isEqualTo(original.getUserCodeClassLoader().asClassLoader()); - assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor()); - assertThat(initContext.getProcessingTimeService()) - .isEqualTo(original.getProcessingTimeService()); - assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()) - .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask()); - assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks()); - assertThat(initContext.getTaskInfo().getAttemptNumber()) - .isEqualTo(original.getTaskInfo().getAttemptNumber()); - assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup()); - assertThat(initContext.getRestoredCheckpointId()) - .isEqualTo(original.getRestoredCheckpointId()); - assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled()); - assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer()); - assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId()); - assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer()); - } - - private static void assertBasicOutput( - List> output, int numberOfCommittables, long checkpointId) { - ListAssert> records = - assertThat(output).hasSize(numberOfCommittables + 1); - CommittableSummaryAssert objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasOverallCommittables(numberOfCommittables); - records.filteredOn(r -> r instanceof CommittableWithLineage) - .allSatisfy( - cl -> - SinkV2Assertions.assertThat((CommittableWithLineage) cl) - .hasCheckpointId(checkpointId) - .hasSubtaskId(0)); - } - - private static class TestCommitterOperator extends AbstractStreamOperator - implements OneInputStreamOperator { - - private static final ListStateDescriptor STREAMING_COMMITTER_RAW_STATES_DESC = - new ListStateDescriptor<>( - "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); - private ListState> committerState; - private final List buffer = new ArrayList<>(); - private final SimpleVersionedSerializer serializer; - - public TestCommitterOperator(SimpleVersionedSerializer serializer) { - this.serializer = serializer; - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - committerState = - new SimpleVersionedListState<>( - context.getOperatorStateStore() - .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), - new TestingCommittableSerializer(serializer)); - } - - @Override - public void processElement(StreamRecord element) throws Exception { - buffer.add(element.getValue()); - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - super.snapshotState(context); - committerState.add(buffer); - } - } - - /** Writes state to test whether the sink can read from alternative state names. */ - private static class CompatibleStateSinkOperator extends AbstractStreamOperator - implements OneInputStreamOperator { - - static final String SINK_STATE_NAME = "compatible_sink_state"; - - static final ListStateDescriptor SINK_STATE_DESC = - new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); - ListState sinkState; - private final SimpleVersionedSerializer serializer; - private final T initialState; - - public CompatibleStateSinkOperator( - SimpleVersionedSerializer serializer, T initialState) { - this.serializer = serializer; - this.initialState = initialState; - } - - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - sinkState = - new SimpleVersionedListState<>( - context.getOperatorStateStore().getListState(SINK_STATE_DESC), - serializer); - if (!context.isRestored()) { - sinkState.add(initialState); - } - } - - @Override - public void processElement(StreamRecord element) { - // do nothing - } - } - - private static class TestingCommittableSerializer - extends SinkV1WriterCommittableSerializer { - - private final SimpleVersionedSerializer committableSerializer; - - public TestingCommittableSerializer( - SimpleVersionedSerializer committableSerializer) { - super(committableSerializer); - this.committableSerializer = committableSerializer; - } - - @Override - public byte[] serialize(List obj) throws IOException { - final DataOutputSerializer out = new DataOutputSerializer(256); - out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); - SimpleVersionedSerialization.writeVersionAndSerializeList( - committableSerializer, obj, out); - return out.getCopyOfBuffer(); - } - } - - abstract InspectableSink sinkWithoutCommitter(); - - abstract InspectableSink sinkWithTimeBasedWriter(); - - abstract InspectableSink sinkWithState(boolean withState, String stateName); - - abstract InspectableSink sinkWithCommitter(); - - /** - * Basic abstraction to access the different flavors of sinks. Remove once the older interfaces - * are removed. - */ - interface InspectableSink { - long getLastCheckpointId(); - - List getRecordsOfCurrentCheckpoint(); - - List getWatermarks(); - - int getRecordCountFromState(); - - Sink getSink(); - } - - abstract static class AbstractInspectableSink< - S extends org.apache.flink.api.connector.sink2.Sink> - implements InspectableSink { - private final S sink; - - protected AbstractInspectableSink(S sink) { - this.sink = sink; - } - - @Override - public S getSink() { - return sink; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index 5b5ba7dcc2dd5..97e3bdff1faaa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; @@ -30,7 +29,6 @@ import org.apache.flink.api.connector.sink2.SupportsCommitter; import org.apache.flink.api.connector.sink2.SupportsWriterState; import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -41,12 +39,12 @@ import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.SerializableFunction; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableSet; -import javax.annotation.Nullable; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -54,94 +52,93 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; -import java.util.Queue; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.assertj.core.api.Assertions.assertThat; /** A {@link Sink} for all the sink related tests. */ public class TestSinkV2 implements Sink { - public static final SimpleVersionedSerializerAdapter COMMITTABLE_SERIALIZER = - new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); - public static final SimpleVersionedSerializerAdapter WRITER_SERIALIZER = + public static final SimpleVersionedSerializer WRITER_SERIALIZER = new SimpleVersionedSerializerAdapter<>(IntSerializer.INSTANCE); - private final DefaultSinkWriter writer; + private final SinkWriter writer; - private TestSinkV2(DefaultSinkWriter writer) { + private TestSinkV2(SinkWriter writer) { this.writer = writer; } public SinkWriter createWriter(WriterInitContext context) { - writer.init(context); + if (writer instanceof DefaultSinkWriter) { + ((DefaultSinkWriter) writer).init(context); + } return writer; } - DefaultSinkWriter getWriter() { + SinkWriter getWriter() { return writer; } - public static Builder newBuilder() { + public static Builder> newBuilder() { return new Builder<>(); } - public static Builder newBuilder(DefaultSinkWriter writer) { - return new Builder().setWriter(writer); - } - - public SupportsCommitter asSupportsCommitter() { - throw new UnsupportedOperationException("No committter"); + public static Builder> newBuilder( + DefaultSinkWriter writer) { + return new Builder>().setWriter(writer); } /** A builder class for {@link TestSinkV2}. */ - public static class Builder { - private DefaultSinkWriter writer = null; - private DefaultCommitter committer; + public static class Builder { + private SinkWriter writer = null; + private Committer committer; private boolean withPostCommitTopology = false; - private boolean withPreCommitTopology = false; + private SerializableFunction preCommitTopology = null; private boolean withWriterState = false; private String compatibleStateNames; + private SerializableSupplier> commSerializerFactory; - public Builder setWriter(DefaultSinkWriter writer) { - this.writer = checkNotNull(writer); - return this; + @SuppressWarnings("unchecked") + public Builder setWriter( + CommittingSinkWriter writer) { + Builder self = (Builder) this; + self.writer = checkNotNull(writer); + return self; } - public Builder setCommitter(DefaultCommitter committer) { - this.committer = committer; - return this; + @SuppressWarnings("unchecked") + public Builder setWriter(SinkWriter writer) { + Builder self = (Builder) this; + self.writer = checkNotNull(writer); + return self; } - public Builder setDefaultCommitter() { - this.committer = new DefaultCommitter(); - return this; - } - - public Builder setDefaultCommitter( - Supplier>> queueSupplier) { - this.committer = new DefaultCommitter(queueSupplier); + public Builder setCommitter( + Committer committer, + SerializableSupplier> commSerializerFactory) { + this.committer = committer; + this.commSerializerFactory = commSerializerFactory; return this; } - public Builder setWithPostCommitTopology(boolean withPostCommitTopology) { + public Builder setWithPostCommitTopology(boolean withPostCommitTopology) { this.withPostCommitTopology = withPostCommitTopology; return this; } - public Builder setWithPreCommitTopology(boolean withPreCommitTopology) { - this.withPreCommitTopology = withPreCommitTopology; + public Builder setWithPreCommitTopology( + SerializableFunction preCommitTopology) { + this.preCommitTopology = preCommitTopology; return this; } - public Builder setWriterState(boolean withWriterState) { + public Builder setWriterState(boolean withWriterState) { this.withWriterState = withWriterState; return this; } - public Builder setCompatibleStateNames(String compatibleStateNames) { + public Builder setCompatibleStateNames(String compatibleStateNames) { this.compatibleStateNames = compatibleStateNames; return this; } @@ -158,139 +155,130 @@ public TestSinkV2 build() { writer = new DefaultCommittingSinkWriter<>(); } if (!withPostCommitTopology) { - if (!withPreCommitTopology) { + if (preCommitTopology == null) { // TwoPhaseCommittingSink with a stateless writer and a committer return new TestSinkV2TwoPhaseCommittingSink<>( - writer, COMMITTABLE_SERIALIZER, committer); + writer, commSerializerFactory, committer); } else { // TwoPhaseCommittingSink with a stateless writer, pre commit topology, // committer - Preconditions.checkArgument( - writer instanceof DefaultCommittingSinkWriter, - "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPreCommitTopology<>( - writer, COMMITTABLE_SERIALIZER, committer); + writer, commSerializerFactory, committer, preCommitTopology); } } else { if (withWriterState) { // TwoPhaseCommittingSink with a stateful writer and a committer and post // commit topology - Preconditions.checkArgument( - writer instanceof DefaultStatefulSinkWriter, - "Please provide a DefaultStatefulSinkWriter instance"); return new TestStatefulSinkV2<>( - (DefaultStatefulSinkWriter) writer, - COMMITTABLE_SERIALIZER, - committer, - compatibleStateNames); + writer, commSerializerFactory, committer, compatibleStateNames); } else { // TwoPhaseCommittingSink with a stateless writer and a committer and post // commit topology - Preconditions.checkArgument( - writer instanceof DefaultCommittingSinkWriter, - "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPostCommitTopology<>( - writer, COMMITTABLE_SERIALIZER, committer); + writer, commSerializerFactory, committer); } } } } } - private static class TestSinkV2TwoPhaseCommittingSink extends TestSinkV2 - implements SupportsCommitter { - private final DefaultCommitter committer; - private final SimpleVersionedSerializer committableSerializer; + private static class TestSinkV2TwoPhaseCommittingSink extends TestSinkV2 + implements SupportsCommitter { + private final Committer committer; + protected final SerializableSupplier> + commSerializerFactory; public TestSinkV2TwoPhaseCommittingSink( - DefaultSinkWriter writer, - SimpleVersionedSerializer committableSerializer, - DefaultCommitter committer) { + SinkWriter writer, + SerializableSupplier> commSerializerFactory, + Committer committer) { super(writer); this.committer = committer; - this.committableSerializer = committableSerializer; + this.commSerializerFactory = commSerializerFactory; } @Override - public Committer createCommitter(CommitterInitContext context) { - committer.init(); + public Committer createCommitter(CommitterInitContext context) { + if (committer instanceof DefaultCommitter) { + ((DefaultCommitter) committer).init(); + } return committer; } @Override - public SupportsCommitter asSupportsCommitter() { - return this; - } - - @Override - public SimpleVersionedSerializer getCommittableSerializer() { - return committableSerializer; + public SimpleVersionedSerializer getCommittableSerializer() { + return commSerializerFactory.get(); } } // -------------------------------------- Sink With PostCommitTopology ------------------------- - private static class TestSinkV2WithPostCommitTopology - extends TestSinkV2TwoPhaseCommittingSink - implements SupportsPostCommitTopology { + private static class TestSinkV2WithPostCommitTopology + extends TestSinkV2TwoPhaseCommittingSink + implements SupportsPostCommitTopology { public TestSinkV2WithPostCommitTopology( - DefaultSinkWriter writer, - SimpleVersionedSerializer committableSerializer, - DefaultCommitter committer) { - super(writer, committableSerializer, committer); + SinkWriter writer, + SerializableSupplier> commSerializerFactory, + Committer committer) { + super(writer, commSerializerFactory, committer); } @Override - public void addPostCommitTopology(DataStream> committables) { + public void addPostCommitTopology(DataStream> committables) { StandardSinkTopologies.addGlobalCommitter( committables, this::createCommitter, this::getCommittableSerializer); } } - private static class TestSinkV2WithPreCommitTopology - extends TestSinkV2TwoPhaseCommittingSink - implements SupportsPreCommitTopology { + private static class TestSinkV2WithPreCommitTopology + extends TestSinkV2TwoPhaseCommittingSink + implements SupportsPreCommitTopology { + private final SerializableFunction preCommitTopology; + public TestSinkV2WithPreCommitTopology( - DefaultSinkWriter writer, - SimpleVersionedSerializer committableSerializer, - DefaultCommitter committer) { - super(writer, committableSerializer, committer); + SinkWriter writer, + SerializableSupplier> commSerializerFactory, + Committer committer, + SerializableFunction preCommitTopology) { + super(writer, commSerializerFactory, committer); + this.preCommitTopology = preCommitTopology; } @Override - public DataStream> addPreCommitTopology( - DataStream> committables) { + public DataStream> addPreCommitTopology( + DataStream> committables) { return committables .map( m -> { if (m instanceof CommittableSummary) { return m; } else { - CommittableWithLineage withLineage = - (CommittableWithLineage) m; - return withLineage.map(old -> old + "Transformed"); + CommittableWithLineage withLineage = + (CommittableWithLineage) m; + return withLineage.map(preCommitTopology); } }) - .returns(CommittableMessageTypeInfo.of(() -> COMMITTABLE_SERIALIZER)); + .returns(CommittableMessageTypeInfo.of(commSerializerFactory)); } @Override - public SimpleVersionedSerializer getWriteResultSerializer() { - return new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); + public SimpleVersionedSerializer getWriteResultSerializer() { + return commSerializerFactory.get(); } } - private static class TestStatefulSinkV2 extends TestSinkV2WithPostCommitTopology + private static class TestStatefulSinkV2 + extends TestSinkV2WithPostCommitTopology implements SupportsWriterState, SupportsWriterState.WithCompatibleState { private final String compatibleState; public TestStatefulSinkV2( - DefaultStatefulSinkWriter writer, - SimpleVersionedSerializer committableSerializer, - DefaultCommitter committer, + SinkWriter writer, + SerializableSupplier> commSerializerFactory, + Committer committer, String compatibleState) { - super(writer, committableSerializer, committer); + super(writer, commSerializerFactory, committer); this.compatibleState = compatibleState; } @@ -303,7 +291,7 @@ public DefaultStatefulSinkWriter createWriter(WriterInitContext context) public StatefulSinkWriter restoreWriter( WriterInitContext context, Collection recoveredState) { DefaultStatefulSinkWriter statefulWriter = - (DefaultStatefulSinkWriter) getWriter(); + (DefaultStatefulSinkWriter) getWriter(); statefulWriter.restore(recoveredState); return statefulWriter; @@ -322,10 +310,90 @@ public Collection getCompatibleWriterStateNames() { // -------------------------------------- Sink Writer ------------------------------------------ + public static class Record implements Serializable { + private final T value; + private final Long timestamp; + private final long watermark; + + public Record(T value, Long timestamp, long watermark) { + this.value = value; + this.timestamp = timestamp; + this.watermark = watermark; + } + + public T getValue() { + return value; + } + + public Long getTimestamp() { + return timestamp; + } + + public long getWatermark() { + return watermark; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + Record record = (Record) object; + return Objects.equals(timestamp, record.timestamp) + && watermark == record.watermark + && Objects.equals(value, record.value); + } + + @Override + public int hashCode() { + return Objects.hash(value, timestamp, watermark); + } + + @Override + public String toString() { + return "Record{" + + "value=" + + value + + ", timestamp=" + + timestamp + + ", watermark=" + + watermark + + '}'; + } + + public Record withValue(T value) { + return new Record<>(value, timestamp, watermark); + } + } + + public static class RecordSerializer implements SimpleVersionedSerializer> { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Record record) throws IOException { + return InstantiationUtil.serializeObject(record); + } + + @Override + public Record deserialize(int version, byte[] serialized) throws IOException { + try { + return InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } + /** Base class for out testing {@link SinkWriter}. */ public static class DefaultSinkWriter implements SinkWriter, Serializable { - protected List elements; + protected List> elements; protected List watermarks; @@ -338,8 +406,7 @@ protected DefaultSinkWriter() { @Override public void write(InputT element, Context context) { - elements.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); + elements.add(new Record<>(element, context.timestamp(), context.currentWatermark())); } @Override @@ -347,6 +414,18 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException { elements = new ArrayList<>(); } + public List> getRecordsOfCurrentCheckpoint() { + return elements; + } + + public List getWatermarks() { + return watermarks; + } + + public long getLastCheckpointId() { + return lastCheckpointId; + } + @Override public void writeWatermark(Watermark watermark) { watermarks.add(watermark); @@ -364,7 +443,23 @@ public void init(WriterInitContext context) { /** Base class for out testing {@link CommittingSinkWriter}. */ protected static class DefaultCommittingSinkWriter extends DefaultSinkWriter - implements CommittingSinkWriter, Serializable { + implements CommittingSinkWriter>, Serializable { + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // We empty the elements on prepareCommit + } + + @Override + public Collection> prepareCommit() { + List> result = elements; + elements = new ArrayList<>(); + return result; + } + } + + protected static class ForwardCommittingSinkWriter extends DefaultSinkWriter + implements CommittingSinkWriter, Serializable { @Override public void flush(boolean endOfInput) throws IOException, InterruptedException { @@ -372,8 +467,9 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException { } @Override - public Collection prepareCommit() { - List result = elements; + public Collection prepareCommit() { + List result = + elements.stream().map(Record::getValue).collect(Collectors.toList()); elements = new ArrayList<>(); return result; } @@ -412,42 +508,15 @@ protected void restore(Collection recoveredState) { // -------------------------------------- Sink Committer --------------------------------------- /** Base class for testing {@link Committer}. */ - public static class DefaultCommitter implements Committer, Serializable { - - @Nullable protected Queue> committedData; - + public static class DefaultCommitter implements Committer, Serializable { private boolean isClosed; - @Nullable private final Supplier>> queueSupplier; - public DefaultCommitter() { - this.committedData = new ConcurrentLinkedQueue<>(); this.isClosed = false; - this.queueSupplier = null; - } - - public DefaultCommitter(@Nullable Supplier>> queueSupplier) { - this.queueSupplier = queueSupplier; - this.isClosed = false; - this.committedData = null; - } - - public List> getCommittedData() { - if (committedData != null) { - return new ArrayList<>(committedData); - } else { - return Collections.emptyList(); - } } @Override - public void commit(Collection> committables) { - if (committedData == null) { - assertThat(queueSupplier).isNotNull(); - committedData = queueSupplier.get(); - } - committedData.addAll(committables); - } + public void commit(Collection> committables) {} public void close() throws Exception { isClosed = true; @@ -463,18 +532,15 @@ public void init() { } /** A {@link Committer} that always re-commits the committables data it received. */ - static class RetryOnceCommitter extends DefaultCommitter { + static class RetryOnceCommitter extends DefaultCommitter { - private final Set seen = new LinkedHashSet<>(); + private final Set seen = new LinkedHashSet<>(); @Override - public void commit(Collection> committables) { + public void commit(Collection> committables) { committables.forEach( c -> { - if (seen.remove(c.getCommittable())) { - checkNotNull(committedData); - committedData.add(c); - } else { + if (!seen.remove(c.getCommittable())) { seen.add(c.getCommittable()); c.retryLater(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java deleted file mode 100644 index bb7795e38d47e..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.connector.sink2.SupportsCommitter; - -import java.util.Collection; - -class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { - - @Override - SinkAndCounters sinkWithPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); - return new SinkAndCounters( - (SupportsCommitter) - TestSinkV2.newBuilder() - .setCommitter(committer) - .setWithPostCommitTopology(true) - .build(), - () -> committer.successfulCommits); - } - - @Override - SinkAndCounters sinkWithPostCommitWithRetry() { - return new SinkAndCounters( - (SupportsCommitter) - TestSinkV2.newBuilder() - .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setWithPostCommitTopology(true) - .build(), - () -> 0); - } - - @Override - SinkAndCounters sinkWithoutPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); - return new SinkAndCounters( - (SupportsCommitter) - TestSinkV2.newBuilder().setCommitter(committer).build(), - () -> committer.successfulCommits); - } - - private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { - private int successfulCommits = 0; - - @Override - public void commit(Collection> committables) { - successfulCommits += committables.size(); - } - - @Override - public void close() throws Exception {} - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java deleted file mode 100644 index b0bf67fe4d2a5..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.operators.ProcessingTimeService; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.ArrayList; -import java.util.List; - -class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - InspectableSink sinkWithoutCommitter() { - TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).build()); - } - - @Override - InspectableSink sinkWithCommitter() { - TestSinkV2.DefaultSinkWriter sinkWriter = - new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new InspectableSink( - TestSinkV2.newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithTimeBasedWriter() { - TestSinkV2.DefaultSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter(); - return new InspectableSink( - TestSinkV2.newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithState(boolean withState, String stateName) { - TestSinkV2.DefaultSinkWriter sinkWriter = - new TestSinkV2.DefaultStatefulSinkWriter<>(); - TestSinkV2.Builder builder = - TestSinkV2.newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .setWithPostCommitTopology(true); - builder.setWriterState(withState); - if (stateName != null) { - builder.setCompatibleStateNames(stateName); - } - return new InspectableSink(builder.build()); - } - - private static class TimeBasedBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter - implements ProcessingTimeService.ProcessingTimeCallback { - - private final List cachedCommittables = new ArrayList<>(); - private ProcessingTimeService processingTimeService; - - @Override - public void write(Integer element, Context context) { - cachedCommittables.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); - } - - @Override - public void onProcessingTime(long time) { - elements.addAll(cachedCommittables); - cachedCommittables.clear(); - this.processingTimeService.registerTimer(time + 1000, this); - } - - @Override - public void init(WriterInitContext context) { - this.processingTimeService = context.getProcessingTimeService(); - this.processingTimeService.registerTimer(1000, this); - } - } - - static class InspectableSink - extends AbstractInspectableSink> { - private final TestSinkV2 sink; - - InspectableSink(TestSinkV2 sink) { - super(sink); - this.sink = sink; - } - - @Override - public long getLastCheckpointId() { - return sink.getWriter().lastCheckpointId; - } - - @Override - public List getRecordsOfCurrentCheckpoint() { - return sink.getWriter().elements; - } - - @Override - public List getWatermarks() { - return sink.getWriter().watermarks; - } - - @Override - public int getRecordCountFromState() { - TestSinkV2.DefaultSinkWriter sinkWriter = sink.getWriter(); - if (sinkWriter instanceof TestSinkV2.DefaultStatefulSinkWriter) { - return ((TestSinkV2.DefaultStatefulSinkWriter) sinkWriter) - .getRecordCount(); - } else { - return 0; - } - } - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 15e00e70b0f97..88e89e3185e33 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -18,159 +18,163 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; -import org.apache.flink.streaming.util.FiniteTestSource; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.SharedObjectsExtension; +import org.apache.flink.testutils.junit.SharedReference; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BooleanSupplier; -import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.assertj.core.api.Assertions.assertThat; /** * Integration test for {@link org.apache.flink.api.connector.sink2.Sink} run time implementation. */ -public class SinkV2ITCase extends AbstractTestBaseJUnit4 { +public class SinkV2ITCase extends AbstractTestBase { + private static final Logger LOG = LoggerFactory.getLogger(SinkV2ITCase.class); + static final List SOURCE_DATA = Arrays.asList( 895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422); - // source send data two times - static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2; - - static final List EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = + static final List> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = SOURCE_DATA.stream() // source send data two times .flatMap( x -> - Collections.nCopies( - 2, Tuple3.of(x, null, Long.MIN_VALUE).toString()) + Collections.nCopies(2, new Record<>(x, null, Long.MIN_VALUE)) .stream()) .collect(Collectors.toList()); - static final List EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = + static final List> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = SOURCE_DATA.stream() - .map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString()) + .map(x -> new Record<>(x, null, Long.MIN_VALUE)) .collect(Collectors.toList()); - static final Queue> COMMIT_QUEUE = - new ConcurrentLinkedQueue<>(); - - static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = - (BooleanSupplier & Serializable) - () -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM; - - @Before - public void init() { - COMMIT_QUEUE.clear(); - } + @RegisterExtension + static final SharedObjectsExtension SHARED_OBJECTS = SharedObjectsExtension.create(); @Test public void writerAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource source = - new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); + SharedReference>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); + final Source source = createStreamingSource(); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.newBuilder() - .setDefaultCommitter( - (Supplier>> - & Serializable) - () -> COMMIT_QUEUE) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray())); + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE); } @Test public void writerAndPrecommitToplogyAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); - final FiniteTestSource source = - new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); + SharedReference>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); + final Source source = createStreamingSource(); - env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.newBuilder() - .setDefaultCommitter( - (Supplier>> - & Serializable) - () -> COMMIT_QUEUE) - .setWithPreCommitTopology(true) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) + .setWithPreCommitTopology(SinkV2ITCase::flipValue) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder( + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf( EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.stream() - .map(s -> s + "Transformed") - .toArray())); + .map(SinkV2ITCase::flipValue) + .collect(Collectors.toList())); + } + + private static Record flipValue(Record r) { + return r.withValue(-r.getValue()); } @Test public void writerAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); + SharedReference>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); - env.fromData(SOURCE_DATA) + final DataGeneratorSource source = + new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue()), + SOURCE_DATA.size(), + IntegerTypeInfo.INT_TYPE_INFO); + + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.newBuilder() - .setDefaultCommitter( - (Supplier>> - & Serializable) - () -> COMMIT_QUEUE) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray())); + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE); } @Test public void writerAndPrecommitToplogyAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); + SharedReference>>> committed = + SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>()); + + final DataGeneratorSource source = + new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue()), + SOURCE_DATA.size(), + IntegerTypeInfo.INT_TYPE_INFO); - env.fromData(SOURCE_DATA) + env.fromSource(source, WatermarkStrategy.noWatermarks(), "source") .sinkTo( TestSinkV2.newBuilder() - .setDefaultCommitter( - (Supplier>> - & Serializable) - () -> COMMIT_QUEUE) - .setWithPreCommitTopology(true) + .setCommitter( + new TrackingCommitter(committed), RecordSerializer::new) + .setWithPreCommitTopology(SinkV2ITCase::flipValue) .build()); env.execute(); - assertThat( - COMMIT_QUEUE.stream() - .map(Committer.CommitRequest::getCommittable) - .collect(Collectors.toList()), - containsInAnyOrder( + assertThat(committed.get()) + .extracting(Committer.CommitRequest::getCommittable) + .containsExactlyInAnyOrderElementsOf( EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.stream() - .map(s -> s + "Transformed") - .toArray())); + .map(SinkV2ITCase::flipValue) + .collect(Collectors.toList())); } private StreamExecutionEnvironment buildStreamEnv() { @@ -185,4 +189,60 @@ private StreamExecutionEnvironment buildBatchEnv() { env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; } + + /** + * A stream source that: 1) emits a list of elements without allowing checkpoints, 2) then waits + * for two more checkpoints to complete, 3) then re-emits the same elements before 4) waiting + * for another two checkpoints and 5) exiting. + */ + private Source createStreamingSource() { + RateLimiterStrategy rateLimiterStrategy = + parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 2); + return new DataGeneratorSource<>( + l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()), + SOURCE_DATA.size() * 2L, + rateLimiterStrategy, + IntegerTypeInfo.INT_TYPE_INFO); + } + + private static class BurstingRateLimiter implements RateLimiter { + private final RateLimiter rateLimiter; + private final int numCheckpointCooldown; + private int cooldown; + + public BurstingRateLimiter(int recordPerCycle, int numCheckpointCooldown) { + rateLimiter = new GatedRateLimiter(recordPerCycle); + this.numCheckpointCooldown = numCheckpointCooldown; + } + + @Override + public CompletionStage acquire() { + CompletionStage stage = rateLimiter.acquire(); + cooldown = numCheckpointCooldown; + return stage; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (cooldown-- <= 0) { + rateLimiter.notifyCheckpointComplete(checkpointId); + } + } + } + + private static class TrackingCommitter implements Committer>, Serializable { + private final SharedReference>>> committed; + + public TrackingCommitter(SharedReference>>> committed) { + this.committed = committed; + } + + @Override + public void commit(Collection>> committables) { + committed.get().addAll(committables); + } + + @Override + public void close() {} + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java index 55b049371a838..108d026b98c63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; @@ -138,12 +139,15 @@ public void testCommitterMetrics() throws Exception { sharedObjects.add(new CountDownLatch(numCommittables)); SharedReference afterLatch = sharedObjects.add(new CountDownLatch(1)); + TestSinkV2 sink = + TestSinkV2.newBuilder() + .setCommitter( + new MetricCommitter(beforeLatch, afterLatch), + TestSinkV2.RecordSerializer::new) + .build(); env.fromSequence(0, numCommittables - 1) .returns(BasicTypeInfo.LONG_TYPE_INFO) - .sinkTo( - (TestSinkV2.newBuilder() - .setCommitter(new MetricCommitter(beforeLatch, afterLatch))) - .build()) + .sinkTo(sink) .name(TEST_SINK_NAME); JobClient jobClient = env.executeAsync(); final JobID jobId = jobClient.getJobID(); @@ -295,7 +299,7 @@ public void write(Long element, Context context) { } } - private static class MetricCommitter extends TestSinkV2.DefaultCommitter { + private static class MetricCommitter extends TestSinkV2.DefaultCommitter> { private int counter = 0; private SharedReference beforeLatch; private SharedReference afterLatch; @@ -309,7 +313,7 @@ private static class MetricCommitter extends TestSinkV2.DefaultCommitter { } @Override - public void commit(Collection> committables) { + public void commit(Collection>> committables) { if (counter == 0) { System.err.println( "Committables arrived " @@ -333,26 +337,26 @@ public void commit(Collection> committables) { committables.forEach( c -> { - switch (c.getCommittable().charAt(1)) { - case '0': + switch (c.getCommittable().getValue().intValue()) { + case 0: c.signalAlreadyCommitted(); // 1 already committed break; - case '1': - case '2': + case 1: + case 2: // 2 failed c.signalFailedWithKnownReason(new RuntimeException()); break; - case '3': + case 3: // Retry without change if (counter == 1) { c.retryLater(); } break; - case '4': - case '5': + case 4: + case 5: // Retry with change - c.updateAndRetryLater("Retry-" + c.getCommittable()); + c.updateAndRetryLater(c.getCommittable().withValue(6L)); } }); }