Skip to content

Commit 1bfc68f

Browse files
committed
feat(core,jdbc): small trigger / scheduler improvements
1 parent 569d78a commit 1bfc68f

File tree

8 files changed

+26
-24
lines changed

8 files changed

+26
-24
lines changed

Diff for: core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java

-5
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,6 @@ private void handle() {
337337
.conditionContext(flowWithTriggers.getConditionContext())
338338
.triggerContext(flowWithTriggers.TriggerContext.toBuilder().date(now()).stopAfter(flowWithTriggers.getAbstractTrigger().getStopAfter()).build())
339339
.build())
340-
.peek(f -> {
341-
if (f.getTriggerContext().getEvaluateRunningDate() != null || isExecutionNotRunning(f)) {
342-
this.triggerState.unlock(f.getTriggerContext());
343-
}
344-
})
345340
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
346341
.filter(this::isExecutionNotRunning)
347342
.map(FlowWithPollingTriggerNextDate::of)
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
package io.kestra.core.schedulers;
22

3+
import java.util.function.Consumer;
4+
35
// For tests purpose
4-
public class DefaultScheduleContext implements ScheduleContextInterface {}
6+
public class DefaultScheduleContext implements ScheduleContextInterface {
7+
@Override
8+
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
9+
consumer.accept(this);
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
11
package io.kestra.core.schedulers;
22

3+
import java.util.function.Consumer;
4+
5+
/**
6+
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
7+
* See AbstractScheduler.handle().
8+
*/
39
public interface ScheduleContextInterface {
10+
/**
11+
* Do trigger retrieval and updating in a single transaction.
12+
*/
13+
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
414
}

Diff for: core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ public interface SchedulerTriggerStateInterface {
2424

2525
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
2626

27-
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
2827

2928
/**
30-
* Required for Kafka
29+
* Used by the JDBC implementation: find triggers in all tenants.
3130
*/
32-
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
31+
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
3332

3433
/**
35-
* Required for Kafka
34+
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
3635
*/
37-
void unlock(Trigger trigger);
36+
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
3837
}

Diff for: jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void run() {
100100
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
101101
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
102102

103-
schedulerContext.startTransaction(scheduleContextInterface -> {
103+
schedulerContext.doInTransaction(scheduleContextInterface -> {
104104
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
105105

106106
consumer.accept(triggers, scheduleContextInterface);

Diff for: jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,14 @@ public JdbcSchedulerContext(JooqDSLContextWrapper dslContextWrapper) {
1818
this.dslContextWrapper = dslContextWrapper;
1919
}
2020

21-
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
21+
@Override
22+
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
2223
this.dslContextWrapper.transaction(configuration -> {
2324
this.context = DSL.using(configuration);
2425

2526
consumer.accept(this);
2627

27-
this.commit();
28+
this.context.commit();
2829
});
2930
}
30-
31-
public void commit() {
32-
this.context.commit();
33-
}
3431
}

Diff for: jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java

-3
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,4 @@ public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now
8484
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
8585
throw new NotImplementedException();
8686
}
87-
88-
@Override
89-
public void unlock(Trigger trigger) {}
9087
}

Diff for: runner-memory/src/main/java/io/kestra/runner/memory/MemorySchedulerTriggerState.java

-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,4 @@ public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now
7979
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
8080
throw new NotImplementedException();
8181
}
82-
83-
@Override
84-
public void unlock(Trigger trigger) {}
8582
}

0 commit comments

Comments
 (0)