Skip to content

Commit fd673a2

Browse files
authored
[FLINK-33974] Implement the Sink transformation depending on the new SinkV2 interfaces
1 parent b309ceb commit fd673a2

File tree

7 files changed

+186
-64
lines changed

7 files changed

+186
-64
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.flink.streaming.runtime.operators.sink;
2020

2121
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
22+
import org.apache.flink.api.connector.sink2.SupportsCommitter;
2323
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
24-
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
24+
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
2525
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
2626
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
2727
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -41,14 +41,12 @@ public final class CommitterOperatorFactory<CommT>
4141
implements OneInputStreamOperatorFactory<
4242
CommittableMessage<CommT>, CommittableMessage<CommT>> {
4343

44-
private final TwoPhaseCommittingSink<?, CommT> sink;
44+
private final SupportsCommitter<CommT> sink;
4545
private final boolean isBatchMode;
4646
private final boolean isCheckpointingEnabled;
4747

4848
public CommitterOperatorFactory(
49-
TwoPhaseCommittingSink<?, CommT> sink,
50-
boolean isBatchMode,
51-
boolean isCheckpointingEnabled) {
49+
SupportsCommitter<CommT> sink, boolean isBatchMode, boolean isCheckpointingEnabled) {
5250
this.sink = checkNotNull(sink);
5351
this.isBatchMode = isBatchMode;
5452
this.isCheckpointingEnabled = isCheckpointingEnabled;
@@ -65,7 +63,7 @@ public <T extends StreamOperator<CommittableMessage<CommT>>> T createStreamOpera
6563
processingTimeService,
6664
sink.getCommittableSerializer(),
6765
context -> sink.createCommitter(context),
68-
sink instanceof WithPostCommitTopology,
66+
sink instanceof SupportsPostCommitTopology,
6967
isBatchMode,
7068
isCheckpointingEnabled);
7169
committerOperator.setup(

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424
import org.apache.flink.api.common.state.ListStateDescriptor;
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
2626
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
27+
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
2728
import org.apache.flink.api.connector.sink2.Sink;
2829
import org.apache.flink.api.connector.sink2.Sink.InitContext;
2930
import org.apache.flink.api.connector.sink2.SinkWriter;
30-
import org.apache.flink.api.connector.sink2.StatefulSink;
31-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
32-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
31+
import org.apache.flink.api.connector.sink2.SupportsCommitter;
32+
import org.apache.flink.api.connector.sink2.SupportsWriterState;
3333
import org.apache.flink.api.connector.sink2.WriterInitContext;
3434
import org.apache.flink.core.io.SimpleVersionedSerializer;
3535
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -113,18 +113,17 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
113113
this.processingTimeService = checkNotNull(processingTimeService);
114114
this.mailboxExecutor = checkNotNull(mailboxExecutor);
115115
this.context = new Context<>();
116-
this.emitDownstream = sink instanceof TwoPhaseCommittingSink;
116+
this.emitDownstream = sink instanceof SupportsCommitter;
117117

118-
if (sink instanceof StatefulSink) {
118+
if (sink instanceof SupportsWriterState) {
119119
writerStateHandler =
120-
new StatefulSinkWriterStateHandler<>((StatefulSink<InputT, ?>) sink);
120+
new StatefulSinkWriterStateHandler<>((SupportsWriterState<InputT, ?>) sink);
121121
} else {
122122
writerStateHandler = new StatelessSinkWriterStateHandler<>(sink);
123123
}
124124

125-
if (sink instanceof TwoPhaseCommittingSink) {
126-
committableSerializer =
127-
((TwoPhaseCommittingSink<InputT, CommT>) sink).getCommittableSerializer();
125+
if (sink instanceof SupportsCommitter) {
126+
committableSerializer = ((SupportsCommitter<CommT>) sink).getCommittableSerializer();
128127
} else {
129128
committableSerializer = null;
130129
}
@@ -188,13 +187,13 @@ private void emitCommittables(Long checkpointId) throws IOException, Interrupted
188187
if (!emitDownstream) {
189188
// To support SinkV1 topologies with only a writer we have to call prepareCommit
190189
// although no committables are forwarded
191-
if (sinkWriter instanceof PrecommittingSinkWriter) {
192-
((PrecommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
190+
if (sinkWriter instanceof CommittingSinkWriter) {
191+
((CommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
193192
}
194193
return;
195194
}
196195
Collection<CommT> committables =
197-
((PrecommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
196+
((CommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
198197
StreamingRuntimeContext runtimeContext = getRuntimeContext();
199198
final int indexOfThisSubtask = runtimeContext.getTaskInfo().getIndexOfThisSubtask();
200199
final int numberOfParallelSubtasks =

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
2525
import org.apache.flink.api.connector.sink2.Sink;
2626
import org.apache.flink.api.connector.sink2.SinkWriter;
27-
import org.apache.flink.api.connector.sink2.StatefulSink;
2827
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
2928
import org.apache.flink.api.connector.sink2.SupportsWriterState;
3029
import org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
@@ -79,10 +78,12 @@ final class StatefulSinkWriterStateHandler<InputT, WriterStateT>
7978

8079
private StatefulSinkWriter<InputT, WriterStateT> sinkWriter;
8180

82-
public StatefulSinkWriterStateHandler(StatefulSink<InputT, WriterStateT> sink) {
83-
this.sink = sink;
81+
public StatefulSinkWriterStateHandler(SupportsWriterState<InputT, WriterStateT> sink) {
82+
Preconditions.checkArgument(
83+
sink instanceof Sink, "Should be an instance of " + Sink.class.getName());
84+
this.sink = (Sink<InputT>) sink;
8485
Collection<String> previousSinkStateNames =
85-
sink instanceof StatefulSink.WithCompatibleState
86+
sink instanceof SupportsWriterState.WithCompatibleState
8687
? ((WithCompatibleState) sink).getCompatibleWriterStateNames()
8788
: Collections.emptyList();
8889
this.writerStateSimpleVersionedSerializer = sink.getWriterStateSerializer();
@@ -116,10 +117,6 @@ public SinkWriter<InputT> createWriter(
116117
Iterables.addAll(states, previousSinkState.get());
117118
}
118119

119-
if (!(sink instanceof SupportsWriterState)) {
120-
throw new IllegalArgumentException("Sink should implement SupportsWriterState");
121-
}
122-
123120
sinkWriter = ((SupportsWriterState) sink).restoreWriter(initContext, states);
124121
} else {
125122
sinkWriter = cast(sink.createWriter(initContext));

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java

+55-30
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424
import org.apache.flink.api.common.operators.SlotSharingGroup;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.connector.sink2.Sink;
27-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
27+
import org.apache.flink.api.connector.sink2.SupportsCommitter;
2828
import org.apache.flink.api.dag.Transformation;
2929
import org.apache.flink.configuration.CoreOptions;
3030
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
3131
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
3232
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
33-
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
34-
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
35-
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
33+
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
34+
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
35+
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
3636
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
3737
import org.apache.flink.streaming.api.datastream.DataStream;
3838
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -44,6 +44,7 @@
4444
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
4545
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
4646
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
47+
import org.apache.flink.util.Preconditions;
4748

4849
import javax.annotation.Nullable;
4950

@@ -135,16 +136,27 @@ private void expand() {
135136

136137
DataStream<T> prewritten = inputStream;
137138

138-
if (sink instanceof WithPreWriteTopology) {
139+
if (sink instanceof SupportsPreWriteTopology) {
139140
prewritten =
140141
adjustTransformations(
141142
prewritten,
142-
((WithPreWriteTopology<T>) sink)::addPreWriteTopology,
143+
((SupportsPreWriteTopology<T>) sink)::addPreWriteTopology,
143144
true,
144145
sink instanceof SupportsConcurrentExecutionAttempts);
145146
}
146147

147-
if (sink instanceof TwoPhaseCommittingSink) {
148+
if (sink instanceof SupportsPreCommitTopology) {
149+
Preconditions.checkArgument(
150+
sink instanceof SupportsCommitter,
151+
"Sink with SupportsPreCommitTopology should implement SupportsCommitter");
152+
}
153+
if (sink instanceof SupportsPostCommitTopology) {
154+
Preconditions.checkArgument(
155+
sink instanceof SupportsCommitter,
156+
"Sink with SupportsPostCommitTopology should implement SupportsCommitter");
157+
}
158+
159+
if (sink instanceof SupportsCommitter) {
148160
addCommittingTopology(sink, prewritten);
149161
} else {
150162
adjustTransformations(
@@ -173,32 +185,27 @@ private void expand() {
173185
}
174186
}
175187

176-
private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {
177-
TwoPhaseCommittingSink<T, CommT> committingSink =
178-
(TwoPhaseCommittingSink<T, CommT>) sink;
179-
TypeInformation<CommittableMessage<CommT>> typeInformation =
188+
private <CommT, WriteResultT> void addCommittingTopology(
189+
Sink<T> sink, DataStream<T> inputStream) {
190+
SupportsCommitter<CommT> committingSink = (SupportsCommitter<CommT>) sink;
191+
TypeInformation<CommittableMessage<CommT>> committableTypeInformation =
180192
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
181193

182-
DataStream<CommittableMessage<CommT>> written =
183-
adjustTransformations(
184-
inputStream,
185-
input ->
186-
input.transform(
187-
WRITER_NAME,
188-
typeInformation,
189-
new SinkWriterOperatorFactory<>(sink)),
190-
false,
191-
sink instanceof SupportsConcurrentExecutionAttempts);
194+
DataStream<CommittableMessage<CommT>> precommitted;
195+
if (sink instanceof SupportsPreCommitTopology) {
196+
SupportsPreCommitTopology<WriteResultT, CommT> preCommittingSink =
197+
(SupportsPreCommitTopology<WriteResultT, CommT>) sink;
198+
TypeInformation<CommittableMessage<WriteResultT>> writeResultTypeInformation =
199+
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
192200

193-
DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);
201+
DataStream<CommittableMessage<WriteResultT>> writerResult =
202+
addWriter(sink, inputStream, writeResultTypeInformation);
194203

195-
if (sink instanceof WithPreCommitTopology) {
196204
precommitted =
197205
adjustTransformations(
198-
precommitted,
199-
((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology,
200-
true,
201-
false);
206+
writerResult, preCommittingSink::addPreCommitTopology, true, false);
207+
} else {
208+
precommitted = addWriter(sink, inputStream, committableTypeInformation);
202209
}
203210

204211
DataStream<CommittableMessage<CommT>> committed =
@@ -207,27 +214,45 @@ private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStre
207214
pc ->
208215
pc.transform(
209216
COMMITTER_NAME,
210-
typeInformation,
217+
committableTypeInformation,
211218
new CommitterOperatorFactory<>(
212219
committingSink,
213220
isBatchMode,
214221
isCheckpointingEnabled)),
215222
false,
216223
false);
217224

218-
if (sink instanceof WithPostCommitTopology) {
225+
if (sink instanceof SupportsPostCommitTopology) {
219226
DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
220227
adjustTransformations(
221228
postcommitted,
222229
pc -> {
223-
((WithPostCommitTopology<T, CommT>) sink).addPostCommitTopology(pc);
230+
((SupportsPostCommitTopology<CommT>) sink).addPostCommitTopology(pc);
224231
return null;
225232
},
226233
true,
227234
false);
228235
}
229236
}
230237

238+
private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(
239+
Sink<T> sink,
240+
DataStream<T> inputStream,
241+
TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
242+
DataStream<CommittableMessage<WriteResultT>> written =
243+
adjustTransformations(
244+
inputStream,
245+
input ->
246+
input.transform(
247+
WRITER_NAME,
248+
typeInformation,
249+
new SinkWriterOperatorFactory<>(sink)),
250+
false,
251+
sink instanceof SupportsConcurrentExecutionAttempts);
252+
253+
return addFailOverRegion(written);
254+
}
255+
231256
/**
232257
* Adds a batch exchange that materializes the output first. This is a no-op in STREAMING.
233258
*/

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,7 @@ public void testSettingOperatorUidHash() {
6767
.setWriterUidHash(writerHash)
6868
.setCommitterUidHash(committerHash)
6969
.build();
70-
src.sinkTo(
71-
TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(),
72-
operatorsUidHashes)
73-
.name(NAME);
70+
src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
7471

7572
final StreamGraph streamGraph = env.getStreamGraph();
7673

@@ -87,9 +84,7 @@ public void testSettingOperatorUids() {
8784
final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
8885
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8986
final DataStreamSource<Integer> src = env.fromElements(1, 2);
90-
src.sinkTo(TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build())
91-
.name(NAME)
92-
.uid(sinkUid);
87+
src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid);
9388

9489
final StreamGraph streamGraph = env.getStreamGraph();
9590
assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
7575
import org.apache.flink.streaming.util.NoOpIntMap;
7676
import org.apache.flink.streaming.util.TestExpandingSink;
77+
import org.apache.flink.streaming.util.TestExpandingSinkWithMixin;
7778
import org.apache.flink.util.AbstractID;
7879
import org.apache.flink.util.Collector;
7980
import org.apache.flink.util.OutputTag;
@@ -908,6 +909,28 @@ public void testAutoParallelismForExpandedTransformations() {
908909
});
909910
}
910911

912+
@Test
913+
public void testAutoParallelismForExpandedTransformationsWithMixin() {
914+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
915+
916+
env.setParallelism(2);
917+
918+
DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
919+
// Parallelism is set to -1 (default parallelism identifier) to imitate the behavior of
920+
// the table planner. Parallelism should be set automatically after translating.
921+
sourceDataStream.sinkTo(new TestExpandingSinkWithMixin()).setParallelism(-1);
922+
923+
StreamGraph graph = env.getStreamGraph();
924+
925+
graph.getStreamNodes()
926+
.forEach(
927+
node -> {
928+
if (!node.getOperatorName().startsWith("Source")) {
929+
Assertions.assertThat(node.getParallelism()).isEqualTo(2);
930+
}
931+
});
932+
}
933+
911934
@Test
912935
public void testCacheTransformation() {
913936
final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();

0 commit comments

Comments
 (0)