Skip to content

Commit d98051f

Browse files
authored
[FLINK-37168][python] Clean up TimerRegistrationAction in unregisteredTimers list after timers are registered (#26011)
1 parent 6b6a73e commit d98051f

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.flink.streaming.api.operators.python.process.timer;
2020

21+
import java.util.List;
22+
23+
/** {@link TimerRegistrationAction} used to register Timer. */
2124
public class TimerRegistrationAction {
2225

2326
private final TimerRegistration timerRegistration;
@@ -26,17 +29,33 @@ public class TimerRegistrationAction {
2629

2730
private boolean isRegistered;
2831

32+
private final List<TimerRegistrationAction> containingList;
33+
2934
public TimerRegistrationAction(
30-
TimerRegistration timerRegistration, byte[] serializedTimerData) {
35+
TimerRegistration timerRegistration,
36+
byte[] serializedTimerData,
37+
List<TimerRegistrationAction> containingList) {
3138
this.timerRegistration = timerRegistration;
3239
this.serializedTimerData = serializedTimerData;
40+
this.containingList = containingList;
3341
this.isRegistered = false;
3442
}
3543

3644
public void run() {
45+
registerTimer();
46+
cleanup();
47+
}
48+
49+
public void registerTimer() {
3750
if (!isRegistered) {
3851
timerRegistration.setTimer(serializedTimerData);
3952
isRegistered = true;
4053
}
4154
}
55+
56+
private void cleanup() {
57+
if (isRegistered) {
58+
containingList.remove(this);
59+
}
60+
}
4261
}

flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
195195

196196
private transient Environment environment;
197197

198-
private transient List<TimerRegistrationAction> unregisteredTimers;
198+
private transient volatile List<TimerRegistrationAction> unregisteredTimers;
199199

200200
public BeamPythonFunctionRunner(
201201
Environment environment,
@@ -311,7 +311,7 @@ public void open(ReadableConfig config) throws Exception {
311311
ShutdownHookUtil.addShutdownHook(
312312
this, BeamPythonFunctionRunner.class.getSimpleName(), LOG);
313313

314-
unregisteredTimers = new LinkedList<>();
314+
unregisteredTimers = Collections.synchronizedList(new LinkedList<>());
315315
}
316316

317317
@Override
@@ -352,10 +352,12 @@ public void process(byte[] data) throws Exception {
352352

353353
@Override
354354
public void drainUnregisteredTimers() {
355-
for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) {
356-
timerRegistrationAction.run();
355+
synchronized (unregisteredTimers) {
356+
for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) {
357+
timerRegistrationAction.registerTimer();
358+
}
359+
unregisteredTimers.clear();
357360
}
358-
unregisteredTimers.clear();
359361
}
360362

361363
@Override
@@ -703,7 +705,9 @@ private TimerReceiverFactory createTimerReceiverFactory() {
703705
(timer, timerData) -> {
704706
TimerRegistrationAction timerRegistrationAction =
705707
new TimerRegistrationAction(
706-
timerRegistration, (byte[]) timer.getUserKey());
708+
timerRegistration,
709+
(byte[]) timer.getUserKey(),
710+
unregisteredTimers);
707711
unregisteredTimers.add(timerRegistrationAction);
708712
environment
709713
.getMainMailboxExecutor()

0 commit comments

Comments
 (0)