Skip to content

Commit 6570509

Browse files
jiexrayZakelly
authored andcommitted
[FLINK-37028][Runtime] Integrate async window operator to DataStream API
1 parent bcb0316 commit 6570509

File tree

6 files changed

+92
-18
lines changed

6 files changed

+92
-18
lines changed

flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2121
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
2222
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.configuration.MemorySize;
25+
import org.apache.flink.configuration.StateBackendOptions;
2426
import org.apache.flink.connector.file.sink.FileSink;
2527
import org.apache.flink.connector.file.src.FileSource;
2628
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
2729
import org.apache.flink.streaming.api.datastream.DataStream;
30+
import org.apache.flink.streaming.api.datastream.KeyedStream;
2831
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2932
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
3033
import org.apache.flink.streaming.examples.wordcount.WordCount;
@@ -33,6 +36,8 @@
3336

3437
import java.time.Duration;
3538

39+
import static org.apache.flink.runtime.state.StateBackendLoader.FORST_STATE_BACKEND_NAME;
40+
3641
/**
3742
* Implements a windowed version of the streaming "WordCount" program.
3843
*
@@ -88,6 +93,14 @@ public static void main(String[] args) throws Exception {
8893
// available in the Flink UI.
8994
env.getConfig().setGlobalJobParameters(params);
9095

96+
if (params.isAsyncState()) {
97+
Configuration config = Configuration.fromMap(env.getConfiguration().toMap());
98+
if (!config.containsKey(StateBackendOptions.STATE_BACKEND.key())) {
99+
config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME);
100+
env.configure(config);
101+
}
102+
}
103+
91104
DataStream<String> text;
92105
if (params.getInputs().isPresent()) {
93106
// Create a new file source that will read files from a given set of directories.
@@ -108,7 +121,7 @@ public static void main(String[] args) throws Exception {
108121
int windowSize = params.getInt("window").orElse(250);
109122
int slideSize = params.getInt("slide").orElse(150);
110123

111-
DataStream<Tuple2<String, Integer>> counts =
124+
KeyedStream<Tuple2<String, Integer>, String> keyedStream =
112125
// The text lines read from the source are split into words
113126
// using a user-defined function. The tokenizer, implemented below,
114127
// will output each words as a (2-tuple) containing (word, 1)
@@ -118,7 +131,14 @@ public static void main(String[] args) throws Exception {
118131
// Using a keyBy allows performing aggregations and other
119132
// stateful transformations over data on a per-key basis.
120133
// This is similar to a GROUP BY clause in a SQL query.
121-
.keyBy(value -> value.f0)
134+
.keyBy(value -> value.f0);
135+
136+
if (params.isAsyncState()) {
137+
keyedStream.enableAsyncState();
138+
}
139+
140+
DataStream<Tuple2<String, Integer>> counts =
141+
keyedStream
122142
// create windows of windowSize records slided every slideSize records
123143
.countWindow(windowSize, slideSize)
124144
// For each key, we perform a simple sum of the "1" field, the count.

flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,13 @@ public static void main(String[] args) throws Exception {
8282
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8383

8484
// For async state, by default we will use the forst state backend.
85-
Configuration config = Configuration.fromMap(env.getConfiguration().toMap());
86-
config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME);
87-
env.configure(config);
85+
if (params.isAsyncState()) {
86+
Configuration config = Configuration.fromMap(env.getConfiguration().toMap());
87+
if (!config.containsKey(StateBackendOptions.STATE_BACKEND.key())) {
88+
config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME);
89+
env.configure(config);
90+
}
91+
}
8892

8993
// Apache Flink’s unified approach to stream and batch processing means that a DataStream
9094
// application executed over bounded input will produce the same final results regardless

flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java

+22
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,28 @@ public void testWindowWordCount() throws Exception {
126126
checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
127127
}
128128

129+
@Test
130+
public void testAsyncWindowWordCount() throws Exception {
131+
final String windowSize = "25";
132+
final String slideSize = "15";
133+
final String textPath = createTempFile("text.txt", WordCountData.TEXT);
134+
final String resultPath = getTempDirPath("result");
135+
136+
org.apache.flink.streaming.examples.windowing.WindowWordCount.main(
137+
new String[] {
138+
"--input", textPath,
139+
"--output", resultPath,
140+
"--window", windowSize,
141+
"--slide", slideSize,
142+
"--async-state"
143+
});
144+
145+
// since the parallel tokenizers might have different speed
146+
// the exact output can not be checked just whether it is well-formed
147+
// checks that the result lines look like e.g. (faust, 2)
148+
checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
149+
}
150+
129151
@Test
130152
public void testWordCount() throws Exception {
131153
final String textPath = createTempFile("text.txt", WordCountData.TEXT);

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ public <R> StateBootstrapTransformation<T> reduce(
153153
function = input.getExecutionEnvironment().clean(function);
154154
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
155155

156-
WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function);
156+
WindowOperator<K, T, ?, R, W> operator =
157+
(WindowOperator<K, T, ?, R, W>) builder.reduce(reduceFunction, function);
157158

158159
SavepointWriterOperatorFactory factory =
159160
(timestamp, path) ->
@@ -180,7 +181,8 @@ public <R> StateBootstrapTransformation<T> reduce(
180181
function = input.getExecutionEnvironment().clean(function);
181182
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
182183

183-
WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function);
184+
WindowOperator<K, T, ?, R, W> operator =
185+
(WindowOperator<K, T, ?, R, W>) builder.reduce(reduceFunction, function);
184186

185187
SavepointWriterOperatorFactory factory =
186188
(timestamp, path) ->
@@ -316,7 +318,8 @@ public <ACC, V, R> StateBootstrapTransformation<T> aggregate(
316318
aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
317319

318320
WindowOperator<K, T, ?, R, W> operator =
319-
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
321+
(WindowOperator<K, T, ?, R, W>)
322+
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
320323

321324
SavepointWriterOperatorFactory factory =
322325
(timestamp, path) ->
@@ -394,7 +397,8 @@ public <ACC, V, R> StateBootstrapTransformation<T> aggregate(
394397
aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
395398

396399
WindowOperator<K, T, ?, R, W> operator =
397-
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
400+
(WindowOperator<K, T, ?, R, W>)
401+
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
398402

399403
SavepointWriterOperatorFactory factory =
400404
(timestamp, path) ->
@@ -419,7 +423,8 @@ public <ACC, V, R> StateBootstrapTransformation<T> aggregate(
419423
* @return The data stream that is the result of applying the window function to the window.
420424
*/
421425
public <R> StateBootstrapTransformation<T> apply(WindowFunction<T, R, K, W> function) {
422-
WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
426+
WindowOperator<K, T, ?, R, W> operator =
427+
(WindowOperator<K, T, ?, R, W>) builder.apply(function);
423428

424429
SavepointWriterOperatorFactory factory =
425430
(timestamp, path) ->
@@ -444,7 +449,8 @@ public <R> StateBootstrapTransformation<T> apply(
444449
WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
445450
function = input.getExecutionEnvironment().clean(function);
446451

447-
WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
452+
WindowOperator<K, T, ?, R, W> operator =
453+
(WindowOperator<K, T, ?, R, W>) builder.apply(function);
448454

449455
SavepointWriterOperatorFactory factory =
450456
(timestamp, path) ->
@@ -466,7 +472,8 @@ public <R> StateBootstrapTransformation<T> apply(
466472
*/
467473
@PublicEvolving
468474
public <R> StateBootstrapTransformation<T> process(ProcessWindowFunction<T, R, K, W> function) {
469-
WindowOperator<K, T, ?, R, W> operator = builder.process(function);
475+
WindowOperator<K, T, ?, R, W> operator =
476+
(WindowOperator<K, T, ?, R, W>) builder.process(function);
470477

471478
SavepointWriterOperatorFactory factory =
472479
(timestamp, path) ->

flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,13 @@ public class WindowedStream<T, K, W extends Window> {
7777

7878
private final WindowOperatorBuilder<T, K, W> builder;
7979

80+
private boolean isEnableAsyncState;
81+
8082
@PublicEvolving
8183
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
8284

8385
this.input = input;
86+
this.isEnableAsyncState = input.isEnableAsyncState();
8487

8588
this.builder =
8689
new WindowOperatorBuilder<>(
@@ -216,7 +219,10 @@ public <R> SingleOutputStreamOperator<R> reduce(
216219
final String opName = builder.generateOperatorName();
217220
final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
218221

219-
OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
222+
OneInputStreamOperator<T, R> operator =
223+
isEnableAsyncState
224+
? builder.asyncReduce(reduceFunction, function)
225+
: builder.reduce(reduceFunction, function);
220226
return input.transform(opName, resultType, operator).setDescription(opDescription);
221227
}
222228

@@ -263,7 +269,10 @@ public <R> SingleOutputStreamOperator<R> reduce(
263269

264270
final String opName = builder.generateOperatorName();
265271
final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
266-
OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
272+
OneInputStreamOperator<T, R> operator =
273+
isEnableAsyncState
274+
? builder.asyncReduce(reduceFunction, function)
275+
: builder.reduce(reduceFunction, function);
267276

268277
return input.transform(opName, resultType, operator).setDescription(opDescription);
269278
}
@@ -414,7 +423,9 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
414423
builder.generateOperatorDescription(aggregateFunction, windowFunction);
415424

416425
OneInputStreamOperator<T, R> operator =
417-
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
426+
isEnableAsyncState
427+
? builder.asyncAggregate(aggregateFunction, windowFunction, accumulatorType)
428+
: builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
418429

419430
return input.transform(opName, resultType, operator).setDescription(opDescription);
420431
}
@@ -525,7 +536,9 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
525536
builder.generateOperatorDescription(aggregateFunction, windowFunction);
526537

527538
OneInputStreamOperator<T, R> operator =
528-
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
539+
isEnableAsyncState
540+
? builder.asyncAggregate(aggregateFunction, windowFunction, accumulatorType)
541+
: builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
529542

530543
return input.transform(opName, resultType, operator).setDescription(opDescription);
531544
}
@@ -569,7 +582,8 @@ public <R> SingleOutputStreamOperator<R> apply(
569582

570583
final String opName = builder.generateOperatorName();
571584
final String opDescription = builder.generateOperatorDescription(function, null);
572-
OneInputStreamOperator<T, R> operator = builder.apply(function);
585+
OneInputStreamOperator<T, R> operator =
586+
isEnableAsyncState ? builder.asyncApply(function) : builder.apply(function);
573587

574588
return input.transform(opName, resultType, operator).setDescription(opDescription);
575589
}
@@ -613,7 +627,8 @@ public <R> SingleOutputStreamOperator<R> process(
613627
final String opName = builder.generateOperatorName();
614628
final String opDesc = builder.generateOperatorDescription(function, null);
615629

616-
OneInputStreamOperator<T, R> operator = builder.process(function);
630+
OneInputStreamOperator<T, R> operator =
631+
isEnableAsyncState ? builder.asyncProcess(function) : builder.process(function);
617632

618633
return input.transform(opName, resultType, operator).setDescription(opDesc);
619634
}
@@ -861,6 +876,7 @@ private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregato
861876
@Experimental
862877
public WindowedStream<T, K, W> enableAsyncState() {
863878
input.enableAsyncState();
879+
this.isEnableAsyncState = true;
864880
return this;
865881
}
866882

flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java

+5
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,9 @@ public boolean isOutputOnlyAfterEndOfStream() {
192192
public boolean isInternalSorterSupported() {
193193
return operatorFactory.getOperatorAttributes().isInternalSorterSupported();
194194
}
195+
196+
@Override
197+
public void enableAsyncState() {
198+
// nothing to do.
199+
}
195200
}

0 commit comments

Comments
 (0)