Skip to content

Commit

Permalink
fix(jdbc): jdbc runner don't purge schedule trigger when you update t…
Browse files Browse the repository at this point in the history
…he flow

close #731
  • Loading branch information
tchiotludo committed Sep 12, 2022
1 parent a19b707 commit 2ac32c8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 26 deletions.
76 changes: 55 additions & 21 deletions core/src/main/java/io/kestra/core/runners/FlowListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import jakarta.inject.Inject;
import jakarta.inject.Named;
Expand All @@ -24,12 +26,13 @@ public class FlowListeners implements FlowListenersInterface {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
private static final TypeReference<List<Flow>> TYPE_REFERENCE = new TypeReference<>(){};

private Boolean isStarted = false;
private final QueueInterface<Flow> flowQueue;

private final List<Flow> flows;

private final List<Consumer<List<Flow>>> consumers = new ArrayList<>();

private final List<BiConsumer<Flow, Flow>> consumersEach = new ArrayList<>();

@Inject
public FlowListeners(
FlowRepositoryInterface flowRepository,
Expand All @@ -41,31 +44,48 @@ public FlowListeners(

@Override
public void run() {
this.flowQueue.receive(flow -> {
if (flow.isDeleted()) {
this.remove(flow);
} else {
this.upsert(flow);
}

if (log.isTraceEnabled()) {
log.trace("Received {} flow '{}.{}'",
flow.isDeleted() ? "deletion" : "update",
flow.getNamespace(),
flow.getId()
);
synchronized (this) {
if (!this.isStarted) {
this.isStarted = true;

this.flowQueue.receive(flow -> {
Optional<Flow> previous = this.previous(flow);

if (flow.isDeleted()) {
this.remove(flow);
} else {
this.upsert(flow);
}

if (log.isTraceEnabled()) {
log.trace(
"Received {} flow '{}.{}'",
flow.isDeleted() ? "deletion" : "update",
flow.getNamespace(),
flow.getId()
);
}

this.notifyConsumersEach(flow, previous.orElse(null));
this.notifyConsumers();
});

if (log.isTraceEnabled()) {
log.trace("FlowListenersService started with {} flows", flows.size());
}
}

this.notifyConsumers();
});

this.notifyConsumers();

if (log.isTraceEnabled()) {
log.trace("FlowListenersService started with {} flows", flows.size());
}
}

private Optional<Flow> previous(Flow flow) {
return flows
.stream()
.filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
.findFirst();
}

private boolean remove(Flow flow) {
synchronized (this) {
boolean remove = flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
Expand All @@ -92,6 +112,13 @@ private void notifyConsumers() {
}
}

private void notifyConsumersEach(Flow flow, Flow previous) {
synchronized (this) {
this.consumersEach
.forEach(consumer -> consumer.accept(flow, previous));
}
}

@Override
public synchronized void listen(Consumer<List<Flow>> consumer) {
synchronized (this) {
Expand All @@ -100,6 +127,13 @@ public synchronized void listen(Consumer<List<Flow>> consumer) {
}
}

@Override
public void listen(BiConsumer<Flow, Flow> consumer) {
synchronized (this) {
consumersEach.add(consumer);
}
}

@SneakyThrows
@Override
public List<Flow> flows() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import io.kestra.core.models.flows.Flow;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public interface FlowListenersInterface {
void run();

void listen(Consumer<List<Flow>> consumer);

void listen(BiConsumer<Flow, Flow> consumer);

List<Flow> flows();
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ public Trigger save(DSLContext dslContext, Trigger trigger) {

return trigger;
}

public Trigger delete(Trigger trigger) {
this.jdbcRepository.delete(trigger);

return trigger;
}
}
23 changes: 18 additions & 5 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.kestra.jdbc.runner;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.*;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.inject.qualifiers.Qualifiers;
Expand All @@ -19,7 +22,7 @@
@Replaces(DefaultScheduler.class)
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;
private final AbstractJdbcTriggerRepository triggerRepository;

@SuppressWarnings("unchecked")
@Inject
Expand All @@ -30,7 +33,7 @@ public JdbcScheduler(
super(applicationContext, flowListeners);

executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(TriggerRepositoryInterface.class);
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);

Expand All @@ -39,7 +42,7 @@ public JdbcScheduler(

@Override
public void run() {
flowListeners.run();
super.run();

// reset scheduler trigger at end
executionQueue.receive(
Expand All @@ -58,6 +61,16 @@ public void run() {
}
);

super.run();
// remove trigger on flow update
this.flowListeners.listen((flow, previous) -> {
if (flow.isDeleted()) {
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> triggerRepository.delete(Trigger.of(flow, abstractTrigger)));
} else if (previous != null) {
FlowService
.findRemovedTrigger(flow, previous)
.forEach(abstractTrigger -> triggerRepository.delete(Trigger.of(flow, abstractTrigger)));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.runner.kafka.services.*;
import io.micronaut.context.annotation.Replaces;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -22,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -134,4 +136,9 @@ public void listen(Consumer<List<Flow>> consumer) {
consumers.add(consumer);
consumer.accept(this.flows());
}

@Override
public void listen(BiConsumer<Flow, Flow> consumer) {
throw new NotImplementedException("Kafka runner don't need to listen on each flow changed since it's handle by stream");
}
}

0 comments on commit 2ac32c8

Please sign in to comment.