Skip to content

Commit 6a5fab8

Browse files
authored
[FLINK-37208][Runtime] Properly notify a new key is selected for async state operators (#26068)
1 parent 19e868f commit 6a5fab8

File tree

16 files changed

+172
-108
lines changed

16 files changed

+172
-108
lines changed

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedProcessOperator.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.datastream.impl.common.OutputCollector;
2727
import org.apache.flink.datastream.impl.common.TimestampCollector;
2828
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
29-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3029

3130
import javax.annotation.Nullable;
3231

@@ -83,10 +82,8 @@ protected NonPartitionedContext<OUT> getNonPartitionedContext() {
8382
}
8483

8584
@Override
86-
@SuppressWarnings({"rawtypes"})
87-
public void setKeyContextElement1(StreamRecord record) throws Exception {
88-
super.setKeyContextElement1(record);
89-
keySet.add(getCurrentKey());
85+
public void newKeySelected(Object newKey) {
86+
keySet.add(newKey);
9087
}
9188

9289
@Override

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.datastream.impl.common.OutputCollector;
2727
import org.apache.flink.datastream.impl.common.TimestampCollector;
2828
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
29-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3029

3130
import javax.annotation.Nullable;
3231

@@ -89,17 +88,8 @@ protected NonPartitionedContext<OUT> getNonPartitionedContext() {
8988
}
9089

9190
@Override
92-
@SuppressWarnings({"rawtypes"})
93-
public void setKeyContextElement1(StreamRecord record) throws Exception {
94-
super.setKeyContextElement1(record);
95-
keySet.add(getCurrentKey());
96-
}
97-
98-
@Override
99-
@SuppressWarnings({"rawtypes"})
100-
public void setKeyContextElement2(StreamRecord record) throws Exception {
101-
super.setKeyContextElement2(record);
102-
keySet.add(getCurrentKey());
91+
public void newKeySelected(Object newKey) {
92+
keySet.add(newKey);
10393
}
10494

10595
@Override

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoOutputProcessOperator.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.datastream.impl.common.OutputCollector;
2727
import org.apache.flink.datastream.impl.common.TimestampCollector;
2828
import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext;
29-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3029
import org.apache.flink.util.OutputTag;
3130
import org.apache.flink.util.Preconditions;
3231

@@ -105,10 +104,8 @@ protected TwoOutputNonPartitionedContext<OUT_MAIN, OUT_SIDE> getNonPartitionedCo
105104
}
106105

107106
@Override
108-
@SuppressWarnings({"rawtypes"})
109-
public void setKeyContextElement1(StreamRecord record) throws Exception {
110-
super.setKeyContextElement1(record);
111-
keySet.add(getCurrentKey());
107+
public void newKeySelected(Object newKey) {
108+
keySet.add(newKey);
112109
}
113110

114111
@Override

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.flink.streaming.api.operators.InternalTimer;
3535
import org.apache.flink.streaming.api.operators.InternalTimerService;
3636
import org.apache.flink.streaming.api.operators.Triggerable;
37-
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3837

3938
import javax.annotation.Nullable;
4039

@@ -117,11 +116,8 @@ protected NonPartitionedContext<OUT> getNonPartitionedContext() {
117116
}
118117

119118
@Override
120-
@SuppressWarnings({"rawtypes"})
121-
// Only element from input1 should be considered as the other side is broadcast input.
122-
public void setKeyContextElement1(StreamRecord record) throws Exception {
123-
super.setKeyContextElement1(record);
124-
keySet.add(getCurrentKey());
119+
public void newKeySelected(Object newKey) {
120+
keySet.add(newKey);
125121
}
126122

127123
@Override

Diff for: flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import org.apache.flink.datastream.impl.operators.MockSumAggregateProcessFunction;
3131
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
3232
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
33-
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
3433
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
34+
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
3535

3636
import org.junit.jupiter.api.Test;
3737

@@ -98,8 +98,8 @@ void testListState() throws Exception {
9898
new KeyedProcessOperator<>(
9999
function, (KeySelector<Integer, Integer>) value -> value);
100100

101-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
102-
new KeyedOneInputStreamOperatorTestHarness<>(
101+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
102+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
103103
processOperator,
104104
(KeySelector<Integer, Integer>) value -> value,
105105
Types.INT)) {
@@ -120,8 +120,8 @@ void testAggState() throws Exception {
120120
KeyedProcessOperator<Integer, Integer, Integer> processOperator =
121121
new KeyedProcessOperator<>(function);
122122

123-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
124-
new KeyedOneInputStreamOperatorTestHarness<>(
123+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
124+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
125125
processOperator,
126126
(KeySelector<Integer, Integer>) value -> value,
127127
Types.INT)) {
@@ -148,8 +148,8 @@ void testValueState() throws Exception {
148148
KeyedProcessOperator<Integer, Integer, Integer> processOperator =
149149
new KeyedProcessOperator<>(function);
150150

151-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
152-
new KeyedOneInputStreamOperatorTestHarness<>(
151+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
152+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
153153
processOperator,
154154
(KeySelector<Integer, Integer>) value -> value,
155155
Types.INT)) {
@@ -175,8 +175,8 @@ void testMapState() throws Exception {
175175
KeyedProcessOperator<Integer, Integer, Integer> processOperator =
176176
new KeyedProcessOperator<>(function);
177177

178-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
179-
new KeyedOneInputStreamOperatorTestHarness<>(
178+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
179+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
180180
processOperator,
181181
(KeySelector<Integer, Integer>) value -> value,
182182
Types.INT)) {
@@ -203,8 +203,8 @@ void testReducingState() throws Exception {
203203
KeyedProcessOperator<Integer, Integer, Integer> processOperator =
204204
new KeyedProcessOperator<>(function);
205205

206-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
207-
new KeyedOneInputStreamOperatorTestHarness<>(
206+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
207+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
208208
processOperator,
209209
(KeySelector<Integer, Integer>) value -> value,
210210
Types.INT)) {
@@ -231,8 +231,8 @@ void testBroadcastMapState() throws Exception {
231231
KeyedProcessOperator<Integer, Integer, Integer> processOperator =
232232
new KeyedProcessOperator<>(function);
233233

234-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
235-
new KeyedOneInputStreamOperatorTestHarness<>(
234+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
235+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
236236
processOperator,
237237
(KeySelector<Integer, Integer>) value -> value,
238238
Types.INT)) {
@@ -260,8 +260,8 @@ void testBroadcastListState() throws Exception {
260260
new KeyedProcessOperator<>(
261261
function, (KeySelector<Integer, Integer>) value -> value);
262262

263-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
264-
new KeyedOneInputStreamOperatorTestHarness<>(
263+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
264+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
265265
processOperator,
266266
(KeySelector<Integer, Integer>) value -> value,
267267
Types.INT)) {

Diff for: flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.datastream.api.context.PartitionedContext;
2626
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
2727
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
28-
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
28+
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
2929

3030
import org.junit.jupiter.api.Test;
3131

@@ -51,8 +51,8 @@ public void processRecord(
5151
}
5252
});
5353

54-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
55-
new KeyedOneInputStreamOperatorTestHarness<>(
54+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
55+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
5656
processOperator,
5757
(KeySelector<Integer, Integer>) value -> value,
5858
Types.INT)) {
@@ -98,8 +98,8 @@ public void endInput(NonPartitionedContext<Integer> ctx) {
9898
}
9999
});
100100

101-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
102-
new KeyedOneInputStreamOperatorTestHarness<>(
101+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
102+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
103103
processOperator,
104104
(KeySelector<Integer, Integer>) value -> value,
105105
Types.INT)) {
@@ -133,8 +133,8 @@ public void processRecord(
133133
// -1 is an invalid key in this suite.
134134
(ignore) -> -1);
135135

136-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
137-
new KeyedOneInputStreamOperatorTestHarness<>(
136+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
137+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
138138
processOperator,
139139
(KeySelector<Integer, Integer>) value -> value,
140140
Types.INT)) {

Diff for: flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.datastream.api.context.PartitionedContext;
2626
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
2727
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
28-
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
28+
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
2929

3030
import org.junit.jupiter.api.Test;
3131

@@ -61,8 +61,8 @@ public void processRecordFromBroadcastInput(
6161
}
6262
});
6363

64-
try (KeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
65-
new KeyedTwoInputStreamOperatorTestHarness<>(
64+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
65+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
6666
processOperator,
6767
(KeySelector<Integer, Long>) (data) -> (long) (data + 1),
6868
(KeySelector<Long, Long>) value -> value + 1,
@@ -130,11 +130,11 @@ public void endBroadcastInput(NonPartitionedContext<Long> ctx) {
130130
}
131131
});
132132

133-
try (KeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
134-
new KeyedTwoInputStreamOperatorTestHarness<>(
133+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
134+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
135135
processOperator,
136136
(KeySelector<Integer, Long>) Long::valueOf,
137-
(KeySelector<Long, Long>) value -> value,
137+
null,
138138
Types.LONG)) {
139139
testHarness.open();
140140
testHarness.processElement1(new StreamRecord<>(1)); // key is 1L
@@ -175,8 +175,8 @@ public void processRecordFromBroadcastInput(
175175
// -1 is an invalid key in this suite.
176176
(out) -> -1L);
177177

178-
try (KeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
179-
new KeyedTwoInputStreamOperatorTestHarness<>(
178+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
179+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
180180
processOperator,
181181
(KeySelector<Integer, Long>) Long::valueOf,
182182
(KeySelector<Long, Long>) value -> value,

Diff for: flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.datastream.api.context.PartitionedContext;
2626
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
2727
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
28-
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
28+
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
2929

3030
import org.junit.jupiter.api.Test;
3131

@@ -59,8 +59,8 @@ public void processRecordFromSecondInput(
5959
}
6060
});
6161

62-
try (KeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
63-
new KeyedTwoInputStreamOperatorTestHarness<>(
62+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
63+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
6464
processOperator,
6565
(KeySelector<Integer, Long>) (data) -> (long) (data + 1),
6666
(KeySelector<Long, Long>) value -> value + 1,
@@ -134,8 +134,8 @@ public void endSecondInput(NonPartitionedContext<Long> ctx) {
134134
}
135135
});
136136

137-
try (KeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
138-
new KeyedTwoInputStreamOperatorTestHarness<>(
137+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
138+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
139139
processOperator,
140140
(KeySelector<Integer, Long>) Long::valueOf,
141141
(KeySelector<Long, Long>) value -> value,
@@ -183,15 +183,23 @@ public void processRecordFromSecondInput(
183183
// -1 is an invalid key in this suite.
184184
(out) -> -1L);
185185

186-
try (KeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
187-
new KeyedTwoInputStreamOperatorTestHarness<>(
186+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
187+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
188188
processOperator,
189189
(KeySelector<Integer, Long>) Long::valueOf,
190190
(KeySelector<Long, Long>) value -> value,
191191
Types.LONG)) {
192192
testHarness.open();
193193
assertThatThrownBy(() -> testHarness.processElement1(new StreamRecord<>(1)))
194194
.isInstanceOf(IllegalStateException.class);
195+
}
196+
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Long, Integer, Long, Long> testHarness =
197+
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
198+
processOperator,
199+
(KeySelector<Integer, Long>) Long::valueOf,
200+
(KeySelector<Long, Long>) value -> value,
201+
Types.LONG)) {
202+
testHarness.open();
195203
assertThatThrownBy(() -> testHarness.processElement2(new StreamRecord<>(1L)))
196204
.isInstanceOf(IllegalStateException.class);
197205
}

Diff for: flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
2626
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
2727
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
28-
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
28+
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
2929
import org.apache.flink.util.OutputTag;
3030

3131
import org.junit.jupiter.api.Test;
@@ -59,8 +59,8 @@ public void processRecord(
5959
},
6060
sideOutputTag);
6161

62-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
63-
new KeyedOneInputStreamOperatorTestHarness<>(
62+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
63+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
6464
processOperator,
6565
(KeySelector<Integer, Integer>) value -> value,
6666
Types.INT)) {
@@ -116,8 +116,8 @@ public void endInput(
116116
},
117117
sideOutputTag);
118118

119-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
120-
new KeyedOneInputStreamOperatorTestHarness<>(
119+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
120+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
121121
processOperator,
122122
(KeySelector<Integer, Integer>) value -> value,
123123
Types.INT)) {
@@ -161,14 +161,21 @@ public void processRecord(
161161
// -1 is an invalid key in this suite.
162162
(KeySelector<Long, Integer>) value -> -1);
163163

164-
try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
165-
new KeyedOneInputStreamOperatorTestHarness<>(
164+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
165+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
166166
processOperator,
167167
(KeySelector<Integer, Integer>) value -> value,
168168
Types.INT)) {
169169
testHarness.open();
170170
assertThatThrownBy(() -> testHarness.processElement(new StreamRecord<>(1)))
171171
.isInstanceOf(IllegalStateException.class);
172+
}
173+
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
174+
AsyncKeyedOneInputStreamOperatorTestHarness.create(
175+
processOperator,
176+
(KeySelector<Integer, Integer>) value -> value,
177+
Types.INT)) {
178+
testHarness.open();
172179
emitToFirstOutput.set(false);
173180
assertThatThrownBy(() -> testHarness.processElement(new StreamRecord<>(1)))
174181
.isInstanceOf(IllegalStateException.class);

0 commit comments

Comments
 (0)